|
@@ -1549,52 +1549,65 @@ class ArchiveRecreater:
|
|
|
"""Try to resume from temporary archive. Return (target archive, resume from path) if successful."""
|
|
|
logger.info('Found %s, will resume interrupted operation', target_name)
|
|
|
old_target = self.open_archive(target_name)
|
|
|
- resume_id = old_target.metadata[b'recreate_source_id']
|
|
|
- resume_args = [safe_decode(arg) for arg in old_target.metadata[b'recreate_args']]
|
|
|
- if resume_id != archive.id:
|
|
|
- logger.warning('Source archive changed, will discard %s and start over', target_name)
|
|
|
- logger.warning('Saved fingerprint: %s', bin_to_hex(resume_id))
|
|
|
- logger.warning('Current fingerprint: %s', archive.fpr)
|
|
|
- old_target.delete(Statistics(), progress=self.progress)
|
|
|
- return None, None # can't resume
|
|
|
- if resume_args != sys.argv[1:]:
|
|
|
- logger.warning('Command line changed, this might lead to inconsistencies')
|
|
|
- logger.warning('Saved: %s', repr(resume_args))
|
|
|
- logger.warning('Current: %s', repr(sys.argv[1:]))
|
|
|
+ if not self.can_resume(archive, old_target, target_name):
|
|
|
+ return None, None
|
|
|
target = self.create_target_archive(target_name + '.temp')
|
|
|
logger.info('Replaying items from interrupted operation...')
|
|
|
- item = None
|
|
|
- for item in old_target.iter_items():
|
|
|
- if 'chunks' in item:
|
|
|
- for chunk in item.chunks:
|
|
|
- self.cache.chunk_incref(chunk.id, target.stats)
|
|
|
- target.stats.nfiles += 1
|
|
|
- target.add_item(item)
|
|
|
- if item:
|
|
|
- resume_from = item.path
|
|
|
- else:
|
|
|
- resume_from = None
|
|
|
- if self.progress:
|
|
|
- old_target.stats.show_progress(final=True)
|
|
|
- target.recreate_partial_chunks = old_target.metadata.get(b'recreate_partial_chunks', [])
|
|
|
- for chunk_id, size, csize in target.recreate_partial_chunks:
|
|
|
+ last_old_item = self.copy_items(old_target, target)
|
|
|
+ resume_from = getattr(last_old_item, 'path', None)
|
|
|
+ self.incref_partial_chunks(old_target, target)
|
|
|
+ old_target.delete(Statistics(), progress=self.progress)
|
|
|
+ logger.info('Done replaying items')
|
|
|
+ return target, resume_from
|
|
|
+
|
|
|
+ def incref_partial_chunks(self, source_archive, target_archive):
|
|
|
+ target_archive.recreate_partial_chunks = source_archive.metadata.get(b'recreate_partial_chunks', [])
|
|
|
+ for chunk_id, size, csize in target_archive.recreate_partial_chunks:
|
|
|
if not self.cache.seen_chunk(chunk_id):
|
|
|
try:
|
|
|
# Repository has __contains__, RemoteRepository doesn't
|
|
|
- self.repository.get(chunk_id)
|
|
|
+ # `chunk_id in repo` doesn't read the data though, so we try to use that if possible.
|
|
|
+ get_or_in = getattr(self.repository, '__contains__', self.repository.get)
|
|
|
+ if get_or_in(chunk_id) is False:
|
|
|
+ raise Repository.ObjectNotFound(chunk_id, self.repository)
|
|
|
except Repository.ObjectNotFound:
|
|
|
# delete/prune/check between invocations: these chunks are gone.
|
|
|
- target.recreate_partial_chunks = None
|
|
|
+ target_archive.recreate_partial_chunks = None
|
|
|
break
|
|
|
# fast-lane insert into chunks cache
|
|
|
self.cache.chunks[chunk_id] = (1, size, csize)
|
|
|
- target.stats.update(size, csize, True)
|
|
|
+ target_archive.stats.update(size, csize, True)
|
|
|
continue
|
|
|
- # incref now, otherwise old_target.delete() might delete these chunks
|
|
|
- self.cache.chunk_incref(chunk_id, target.stats)
|
|
|
- old_target.delete(Statistics(), progress=self.progress)
|
|
|
- logger.info('Done replaying items')
|
|
|
- return target, resume_from
|
|
|
+ # incref now, otherwise a source_archive.delete() might delete these chunks
|
|
|
+ self.cache.chunk_incref(chunk_id, target_archive.stats)
|
|
|
+
|
|
|
+ def copy_items(self, source_archive, target_archive):
|
|
|
+ item = None
|
|
|
+ for item in source_archive.iter_items():
|
|
|
+ if 'chunks' in item:
|
|
|
+ for chunk in item.chunks:
|
|
|
+ self.cache.chunk_incref(chunk.id, target_archive.stats)
|
|
|
+ target_archive.stats.nfiles += 1
|
|
|
+ target_archive.add_item(item)
|
|
|
+ if self.progress:
|
|
|
+ target_archive.stats.show_progress(final=True)
|
|
|
+ return item
|
|
|
+
|
|
|
+ def can_resume(self, archive, old_target, target_name):
|
|
|
+ resume_id = old_target.metadata[b'recreate_source_id']
|
|
|
+ resume_args = [safe_decode(arg) for arg in old_target.metadata[b'recreate_args']]
|
|
|
+ if resume_id != archive.id:
|
|
|
+ logger.warning('Source archive changed, will discard %s and start over', target_name)
|
|
|
+ logger.warning('Saved fingerprint: %s', bin_to_hex(resume_id))
|
|
|
+ logger.warning('Current fingerprint: %s', archive.fpr)
|
|
|
+ old_target.delete(Statistics(), progress=self.progress)
|
|
|
+ return False
|
|
|
+ if resume_args != sys.argv[1:]:
|
|
|
+ logger.warning('Command line changed, this might lead to inconsistencies')
|
|
|
+ logger.warning('Saved: %s', repr(resume_args))
|
|
|
+ logger.warning('Current: %s', repr(sys.argv[1:]))
|
|
|
+ # Just warn in this case, don't start over
|
|
|
+ return True
|
|
|
|
|
|
def create_target_archive(self, name):
|
|
|
target = Archive(self.repository, self.key, self.manifest, name, create=True,
|