|
@@ -1394,10 +1394,6 @@ class ArchiveChecker:
|
|
|
|
|
|
|
|
|
class ArchiveRecreater:
|
|
|
- class FakeTargetArchive:
|
|
|
- def __init__(self):
|
|
|
- self.stats = Statistics()
|
|
|
-
|
|
|
class Interrupted(Exception):
|
|
|
def __init__(self, metadata=None):
|
|
|
self.metadata = metadata or {}
|
|
@@ -1421,6 +1417,9 @@ class ArchiveRecreater:
|
|
|
self.exclude_if_present = exclude_if_present or []
|
|
|
self.keep_tag_files = keep_tag_files
|
|
|
|
|
|
+ self.rechunkify = chunker_params is not None
|
|
|
+ if self.rechunkify:
|
|
|
+ logger.debug('Rechunking archives to %s', chunker_params)
|
|
|
self.chunker_params = chunker_params or CHUNKER_PARAMS
|
|
|
self.recompress = bool(compression)
|
|
|
self.always_recompress = always_recompress
|
|
@@ -1434,7 +1433,7 @@ class ArchiveRecreater:
|
|
|
self.stats = stats
|
|
|
self.progress = progress
|
|
|
self.print_file_status = file_status_printer or (lambda *args: None)
|
|
|
- self.checkpoint_interval = checkpoint_interval
|
|
|
+ self.checkpoint_interval = None if dry_run else checkpoint_interval
|
|
|
|
|
|
def recreate(self, archive_name, comment=None, target_name=None):
|
|
|
assert not self.is_temporary_archive(archive_name)
|
|
@@ -1444,10 +1443,10 @@ class ArchiveRecreater:
|
|
|
self.matcher_add_tagged_dirs(archive)
|
|
|
if self.matcher.empty() and not self.recompress and not target.recreate_rechunkify and comment is None:
|
|
|
logger.info("Skipping archive %s, nothing to do", archive_name)
|
|
|
- return True
|
|
|
+ return
|
|
|
self.process_items(archive, target)
|
|
|
replace_original = target_name is None
|
|
|
- return self.save(archive, target, comment, replace_original=replace_original)
|
|
|
+ self.save(archive, target, comment, replace_original=replace_original)
|
|
|
|
|
|
def process_items(self, archive, target):
|
|
|
matcher = self.matcher
|
|
@@ -1494,12 +1493,11 @@ class ArchiveRecreater:
|
|
|
self.print_file_status(file_status(item.mode), item.path)
|
|
|
|
|
|
def process_chunks(self, archive, target, item):
|
|
|
- """Return new chunk ID list for 'item'."""
|
|
|
if not self.recompress and not target.recreate_rechunkify:
|
|
|
for chunk_id, size, csize in item.chunks:
|
|
|
self.cache.chunk_incref(chunk_id, target.stats)
|
|
|
return item.chunks
|
|
|
- chunk_iterator = self.create_chunk_iterator(archive, target, list(item.chunks))
|
|
|
+ chunk_iterator = self.iter_chunks(archive, target, list(item.chunks))
|
|
|
compress = self.compression_decider1.decide(item.path)
|
|
|
chunk_processor = partial(self.chunk_processor, target, compress)
|
|
|
target.chunk_file(item, self.cache, target.stats, chunk_iterator, chunk_processor)
|
|
@@ -1517,24 +1515,22 @@ class ArchiveRecreater:
|
|
|
if Compressor.detect(old_chunk.data).name == compression_spec['name']:
|
|
|
# Stored chunk has the same compression we wanted
|
|
|
overwrite = False
|
|
|
- chunk_id, size, csize = self.cache.add_chunk(chunk_id, chunk, target.stats, overwrite=overwrite)
|
|
|
- self.seen_chunks.add(chunk_id)
|
|
|
- return chunk_id, size, csize
|
|
|
+ chunk_entry = self.cache.add_chunk(chunk_id, chunk, target.stats, overwrite=overwrite)
|
|
|
+ self.seen_chunks.add(chunk_entry.id)
|
|
|
+ return chunk_entry
|
|
|
|
|
|
- def create_chunk_iterator(self, archive, target, chunks):
|
|
|
- """Return iterator of chunks to store for 'item' from 'archive' in 'target'."""
|
|
|
+ def iter_chunks(self, archive, target, chunks):
|
|
|
chunk_iterator = archive.pipeline.fetch_many([chunk_id for chunk_id, _, _ in chunks])
|
|
|
if target.recreate_rechunkify:
|
|
|
# The target.chunker will read the file contents through ChunkIteratorFileWrapper chunk-by-chunk
|
|
|
# (does not load the entire file into memory)
|
|
|
file = ChunkIteratorFileWrapper(chunk_iterator)
|
|
|
- return target.chunker.chunkify(file)
|
|
|
+ yield from target.chunker.chunkify(file)
|
|
|
else:
|
|
|
for chunk in chunk_iterator:
|
|
|
yield chunk.data
|
|
|
|
|
|
def save(self, archive, target, comment=None, replace_original=True):
|
|
|
- """Save target archive. If completed, replace source. If not, save temporary with additional 'metadata' dict."""
|
|
|
if self.dry_run:
|
|
|
return
|
|
|
timestamp = archive.ts.replace(tzinfo=None)
|
|
@@ -1591,12 +1587,13 @@ class ArchiveRecreater:
|
|
|
|
|
|
def create_target(self, archive, target_name=None):
|
|
|
"""Create target archive."""
|
|
|
- if self.dry_run:
|
|
|
- return self.FakeTargetArchive(), None
|
|
|
target_name = target_name or archive.name + '.recreate'
|
|
|
target = self.create_target_archive(target_name)
|
|
|
# If the archives use the same chunker params, then don't rechunkify
|
|
|
- target.recreate_rechunkify = tuple(archive.metadata.get('chunker_params', [])) != self.chunker_params
|
|
|
+ source_chunker_params = tuple(archive.metadata.get('chunker_params', []))
|
|
|
+ target.recreate_rechunkify = self.rechunkify and source_chunker_params != target.chunker_params
|
|
|
+ if target.recreate_rechunkify:
|
|
|
+ logger.debug('Rechunking archive from %s to %s', source_chunker_params or '(unknown)', target.chunker_params)
|
|
|
return target
|
|
|
|
|
|
def create_target_archive(self, name):
|