|
@@ -19,6 +19,7 @@ logger = create_logger()
|
|
from . import xattr
|
|
from . import xattr
|
|
from .cache import ChunkListEntry
|
|
from .cache import ChunkListEntry
|
|
from .chunker import Chunker
|
|
from .chunker import Chunker
|
|
|
|
+from .compress import Compressor
|
|
from .constants import * # NOQA
|
|
from .constants import * # NOQA
|
|
from .hashindex import ChunkIndex, ChunkIndexEntry
|
|
from .hashindex import ChunkIndex, ChunkIndexEntry
|
|
from .helpers import Manifest
|
|
from .helpers import Manifest
|
|
@@ -1298,7 +1299,7 @@ class ArchiveRecreater:
|
|
|
|
|
|
def __init__(self, repository, manifest, key, cache, matcher,
|
|
def __init__(self, repository, manifest, key, cache, matcher,
|
|
exclude_caches=False, exclude_if_present=None, keep_tag_files=False,
|
|
exclude_caches=False, exclude_if_present=None, keep_tag_files=False,
|
|
- chunker_params=None, compression=None, compression_files=None,
|
|
|
|
|
|
+ chunker_params=None, compression=None, compression_files=None, always_recompress=False,
|
|
dry_run=False, stats=False, progress=False, file_status_printer=None):
|
|
dry_run=False, stats=False, progress=False, file_status_printer=None):
|
|
self.repository = repository
|
|
self.repository = repository
|
|
self.key = key
|
|
self.key = key
|
|
@@ -1312,10 +1313,11 @@ class ArchiveRecreater:
|
|
|
|
|
|
self.chunker_params = chunker_params or CHUNKER_PARAMS
|
|
self.chunker_params = chunker_params or CHUNKER_PARAMS
|
|
self.recompress = bool(compression)
|
|
self.recompress = bool(compression)
|
|
|
|
+ self.always_recompress = always_recompress
|
|
self.compression = compression or CompressionSpec('none')
|
|
self.compression = compression or CompressionSpec('none')
|
|
self.seen_chunks = set()
|
|
self.seen_chunks = set()
|
|
self.compression_decider1 = CompressionDecider1(compression or CompressionSpec('none'),
|
|
self.compression_decider1 = CompressionDecider1(compression or CompressionSpec('none'),
|
|
- compression_files or [])
|
|
|
|
|
|
+ compression_files or [])
|
|
key.compression_decider2 = CompressionDecider2(compression or CompressionSpec('none'))
|
|
key.compression_decider2 = CompressionDecider2(compression or CompressionSpec('none'))
|
|
|
|
|
|
self.autocommit_threshold = max(self.AUTOCOMMIT_THRESHOLD, self.cache.chunks_stored_size() / 100)
|
|
self.autocommit_threshold = max(self.AUTOCOMMIT_THRESHOLD, self.cache.chunks_stored_size() / 100)
|
|
@@ -1329,10 +1331,10 @@ class ArchiveRecreater:
|
|
self.interrupt = False
|
|
self.interrupt = False
|
|
self.errors = False
|
|
self.errors = False
|
|
|
|
|
|
- def recreate(self, archive_name, comment=None):
|
|
|
|
|
|
+ def recreate(self, archive_name, comment=None, target_name=None):
|
|
assert not self.is_temporary_archive(archive_name)
|
|
assert not self.is_temporary_archive(archive_name)
|
|
archive = self.open_archive(archive_name)
|
|
archive = self.open_archive(archive_name)
|
|
- target, resume_from = self.create_target_or_resume(archive)
|
|
|
|
|
|
+ target, resume_from = self.create_target_or_resume(archive, target_name)
|
|
if self.exclude_if_present or self.exclude_caches:
|
|
if self.exclude_if_present or self.exclude_caches:
|
|
self.matcher_add_tagged_dirs(archive)
|
|
self.matcher_add_tagged_dirs(archive)
|
|
if self.matcher.empty() and not self.recompress and not target.recreate_rechunkify and comment is None:
|
|
if self.matcher.empty() and not self.recompress and not target.recreate_rechunkify and comment is None:
|
|
@@ -1342,7 +1344,8 @@ class ArchiveRecreater:
|
|
self.process_items(archive, target, resume_from)
|
|
self.process_items(archive, target, resume_from)
|
|
except self.Interrupted as e:
|
|
except self.Interrupted as e:
|
|
return self.save(archive, target, completed=False, metadata=e.metadata)
|
|
return self.save(archive, target, completed=False, metadata=e.metadata)
|
|
- return self.save(archive, target, comment)
|
|
|
|
|
|
+ replace_original = target_name is None
|
|
|
|
+ return self.save(archive, target, comment, replace_original=replace_original)
|
|
|
|
|
|
def process_items(self, archive, target, resume_from=None):
|
|
def process_items(self, archive, target, resume_from=None):
|
|
matcher = self.matcher
|
|
matcher = self.matcher
|
|
@@ -1404,7 +1407,6 @@ class ArchiveRecreater:
|
|
|
|
|
|
def process_chunks(self, archive, target, item):
|
|
def process_chunks(self, archive, target, item):
|
|
"""Return new chunk ID list for 'item'."""
|
|
"""Return new chunk ID list for 'item'."""
|
|
- # TODO: support --compression-from
|
|
|
|
if not self.recompress and not target.recreate_rechunkify:
|
|
if not self.recompress and not target.recreate_rechunkify:
|
|
for chunk_id, size, csize in item.chunks:
|
|
for chunk_id, size, csize in item.chunks:
|
|
self.cache.chunk_incref(chunk_id, target.stats)
|
|
self.cache.chunk_incref(chunk_id, target.stats)
|
|
@@ -1412,13 +1414,22 @@ class ArchiveRecreater:
|
|
new_chunks = self.process_partial_chunks(target)
|
|
new_chunks = self.process_partial_chunks(target)
|
|
chunk_iterator = self.create_chunk_iterator(archive, target, item)
|
|
chunk_iterator = self.create_chunk_iterator(archive, target, item)
|
|
consume(chunk_iterator, len(new_chunks))
|
|
consume(chunk_iterator, len(new_chunks))
|
|
|
|
+ compress = self.compression_decider1.decide(item.path)
|
|
for chunk in chunk_iterator:
|
|
for chunk in chunk_iterator:
|
|
|
|
+ chunk.meta['compress'] = compress
|
|
chunk_id = self.key.id_hash(chunk.data)
|
|
chunk_id = self.key.id_hash(chunk.data)
|
|
if chunk_id in self.seen_chunks:
|
|
if chunk_id in self.seen_chunks:
|
|
new_chunks.append(self.cache.chunk_incref(chunk_id, target.stats))
|
|
new_chunks.append(self.cache.chunk_incref(chunk_id, target.stats))
|
|
else:
|
|
else:
|
|
- # TODO: detect / skip / --always-recompress
|
|
|
|
- chunk_id, size, csize = self.cache.add_chunk(chunk_id, chunk, target.stats, overwrite=self.recompress)
|
|
|
|
|
|
+ compression_spec, chunk = self.key.compression_decider2.decide(chunk)
|
|
|
|
+ overwrite = self.recompress
|
|
|
|
+ if self.recompress and not self.always_recompress and chunk_id in self.cache.chunks:
|
|
|
|
+ # Check if this chunk is already compressed the way we want it
|
|
|
|
+ old_chunk = self.key.decrypt(None, self.repository.get(chunk_id), decompress=False)
|
|
|
|
+ if Compressor.detect(old_chunk.data).name == compression_spec['name']:
|
|
|
|
+ # Stored chunk has the same compression we wanted
|
|
|
|
+ overwrite = False
|
|
|
|
+ chunk_id, size, csize = self.cache.add_chunk(chunk_id, chunk, target.stats, overwrite=overwrite)
|
|
new_chunks.append((chunk_id, size, csize))
|
|
new_chunks.append((chunk_id, size, csize))
|
|
self.seen_chunks.add(chunk_id)
|
|
self.seen_chunks.add(chunk_id)
|
|
if self.recompress:
|
|
if self.recompress:
|
|
@@ -1465,7 +1476,7 @@ class ArchiveRecreater:
|
|
logger.debug('Copied %d chunks from a partially processed item', len(partial_chunks))
|
|
logger.debug('Copied %d chunks from a partially processed item', len(partial_chunks))
|
|
return partial_chunks
|
|
return partial_chunks
|
|
|
|
|
|
- def save(self, archive, target, comment=None, completed=True, metadata=None):
|
|
|
|
|
|
+ def save(self, archive, target, comment=None, completed=True, metadata=None, replace_original=True):
|
|
"""Save target archive. If completed, replace source. If not, save temporary with additional 'metadata' dict."""
|
|
"""Save target archive. If completed, replace source. If not, save temporary with additional 'metadata' dict."""
|
|
if self.dry_run:
|
|
if self.dry_run:
|
|
return completed
|
|
return completed
|
|
@@ -1477,8 +1488,9 @@ class ArchiveRecreater:
|
|
'cmdline': archive.metadata[b'cmdline'],
|
|
'cmdline': archive.metadata[b'cmdline'],
|
|
'recreate_cmdline': sys.argv,
|
|
'recreate_cmdline': sys.argv,
|
|
})
|
|
})
|
|
- archive.delete(Statistics(), progress=self.progress)
|
|
|
|
- target.rename(archive.name)
|
|
|
|
|
|
+ if replace_original:
|
|
|
|
+ archive.delete(Statistics(), progress=self.progress)
|
|
|
|
+ target.rename(archive.name)
|
|
if self.stats:
|
|
if self.stats:
|
|
target.end = datetime.utcnow()
|
|
target.end = datetime.utcnow()
|
|
log_multi(DASHES,
|
|
log_multi(DASHES,
|
|
@@ -1530,11 +1542,11 @@ class ArchiveRecreater:
|
|
matcher.add(tag_files, True)
|
|
matcher.add(tag_files, True)
|
|
matcher.add(tagged_dirs, False)
|
|
matcher.add(tagged_dirs, False)
|
|
|
|
|
|
- def create_target_or_resume(self, archive):
|
|
|
|
|
|
+ def create_target_or_resume(self, archive, target_name=None):
|
|
"""Create new target archive or resume from temporary archive, if it exists. Return archive, resume from path"""
|
|
"""Create new target archive or resume from temporary archive, if it exists. Return archive, resume from path"""
|
|
if self.dry_run:
|
|
if self.dry_run:
|
|
return self.FakeTargetArchive(), None
|
|
return self.FakeTargetArchive(), None
|
|
- target_name = archive.name + '.recreate'
|
|
|
|
|
|
+ target_name = target_name or archive.name + '.recreate'
|
|
resume = target_name in self.manifest.archives
|
|
resume = target_name in self.manifest.archives
|
|
target, resume_from = None, None
|
|
target, resume_from = None, None
|
|
if resume:
|
|
if resume:
|