|
@@ -21,11 +21,11 @@ logger = create_logger()
|
|
|
from . import xattr
|
|
|
from .cache import ChunkListEntry
|
|
|
from .chunker import Chunker
|
|
|
-from .compress import Compressor
|
|
|
+from .compress import Compressor, CompressionSpec
|
|
|
from .constants import * # NOQA
|
|
|
from .hashindex import ChunkIndex, ChunkIndexEntry
|
|
|
from .helpers import Manifest
|
|
|
-from .helpers import Chunk, ChunkIteratorFileWrapper, open_item
|
|
|
+from .helpers import ChunkIteratorFileWrapper, open_item
|
|
|
from .helpers import Error, IntegrityError, set_ec
|
|
|
from .helpers import uid2user, user2uid, gid2group, group2gid
|
|
|
from .helpers import parse_timestamp, to_localtime
|
|
@@ -36,7 +36,6 @@ from .helpers import bin_to_hex
|
|
|
from .helpers import safe_ns
|
|
|
from .helpers import ellipsis_truncate, ProgressIndicatorPercent, log_multi
|
|
|
from .helpers import PathPrefixPattern, FnmatchPattern
|
|
|
-from .helpers import CompressionDecider1, CompressionDecider2, CompressionSpec
|
|
|
from .item import Item, ArchiveItem
|
|
|
from .key import key_factory
|
|
|
from .platform import acl_get, acl_set, set_flags, get_flags, swidth
|
|
@@ -196,7 +195,7 @@ class DownloadPipeline:
|
|
|
otherwise preloaded chunks will accumulate in RemoteRepository and create a memory leak.
|
|
|
"""
|
|
|
unpacker = msgpack.Unpacker(use_list=False)
|
|
|
- for _, data in self.fetch_many(ids):
|
|
|
+ for data in self.fetch_many(ids):
|
|
|
unpacker.feed(data)
|
|
|
items = [Item(internal_dict=item) for item in unpacker]
|
|
|
for item in items:
|
|
@@ -238,7 +237,9 @@ class ChunkBuffer:
|
|
|
if self.buffer.tell() == 0:
|
|
|
return
|
|
|
self.buffer.seek(0)
|
|
|
- chunks = list(Chunk(bytes(s)) for s in self.chunker.chunkify(self.buffer))
|
|
|
+ # The chunker returns a memoryview to its internal buffer,
|
|
|
+ # thus a copy is needed before resuming the chunker iterator.
|
|
|
+ chunks = list(bytes(s) for s in self.chunker.chunkify(self.buffer))
|
|
|
self.buffer.seek(0)
|
|
|
self.buffer.truncate(0)
|
|
|
# Leave the last partial chunk in the buffer unless flush is True
|
|
@@ -246,7 +247,7 @@ class ChunkBuffer:
|
|
|
for chunk in chunks[:end]:
|
|
|
self.chunks.append(self.write_chunk(chunk))
|
|
|
if end == -1:
|
|
|
- self.buffer.write(chunks[-1].data)
|
|
|
+ self.buffer.write(chunks[-1])
|
|
|
|
|
|
def is_full(self):
|
|
|
return self.buffer.tell() > self.BUFFER_SIZE
|
|
@@ -260,7 +261,7 @@ class CacheChunkBuffer(ChunkBuffer):
|
|
|
self.stats = stats
|
|
|
|
|
|
def write_chunk(self, chunk):
|
|
|
- id_, _, _ = self.cache.add_chunk(self.key.id_hash(chunk.data), chunk, self.stats, wait=False)
|
|
|
+ id_, _, _ = self.cache.add_chunk(self.key.id_hash(chunk), chunk, self.stats, wait=False)
|
|
|
self.cache.repository.async_response(wait=False)
|
|
|
return id_
|
|
|
|
|
@@ -278,7 +279,7 @@ class Archive:
|
|
|
|
|
|
def __init__(self, repository, key, manifest, name, cache=None, create=False,
|
|
|
checkpoint_interval=300, numeric_owner=False, noatime=False, noctime=False, progress=False,
|
|
|
- chunker_params=CHUNKER_PARAMS, start=None, start_monotonic=None, end=None, compression=None, compression_files=None,
|
|
|
+ chunker_params=CHUNKER_PARAMS, start=None, start_monotonic=None, end=None,
|
|
|
consider_part_files=False, log_json=False):
|
|
|
self.cwd = os.getcwd()
|
|
|
self.key = key
|
|
@@ -307,12 +308,8 @@ class Archive:
|
|
|
self.pipeline = DownloadPipeline(self.repository, self.key)
|
|
|
self.create = create
|
|
|
if self.create:
|
|
|
- self.file_compression_logger = create_logger('borg.debug.file-compression')
|
|
|
self.items_buffer = CacheChunkBuffer(self.cache, self.key, self.stats)
|
|
|
self.chunker = Chunker(self.key.chunk_seed, *chunker_params)
|
|
|
- self.compression_decider1 = CompressionDecider1(compression or CompressionSpec('none'),
|
|
|
- compression_files or [])
|
|
|
- key.compression_decider2 = CompressionDecider2(compression or CompressionSpec('none'))
|
|
|
if name in manifest.archives:
|
|
|
raise self.AlreadyExists(name)
|
|
|
self.last_checkpoint = time.monotonic()
|
|
@@ -330,7 +327,7 @@ class Archive:
|
|
|
self.zeros = None
|
|
|
|
|
|
def _load_meta(self, id):
|
|
|
- _, data = self.key.decrypt(id, self.repository.get(id))
|
|
|
+ data = self.key.decrypt(id, self.repository.get(id))
|
|
|
metadata = ArchiveItem(internal_dict=msgpack.unpackb(data, unicode_errors='surrogateescape'))
|
|
|
if metadata.version != 1:
|
|
|
raise Exception('Unknown archive metadata version')
|
|
@@ -469,7 +466,7 @@ Utilization of max. archive size: {csize_max:.0%}
|
|
|
metadata = ArchiveItem(metadata)
|
|
|
data = self.key.pack_and_authenticate_metadata(metadata.as_dict(), context=b'archive')
|
|
|
self.id = self.key.id_hash(data)
|
|
|
- self.cache.add_chunk(self.id, Chunk(data), self.stats)
|
|
|
+ self.cache.add_chunk(self.id, data, self.stats)
|
|
|
while self.repository.async_response(wait=True) is not None:
|
|
|
pass
|
|
|
self.manifest.archives[name] = (self.id, metadata.time)
|
|
@@ -495,7 +492,7 @@ Utilization of max. archive size: {csize_max:.0%}
|
|
|
add(self.id)
|
|
|
for id, chunk in zip(self.metadata.items, self.repository.get_many(self.metadata.items)):
|
|
|
add(id)
|
|
|
- _, data = self.key.decrypt(id, chunk)
|
|
|
+ data = self.key.decrypt(id, chunk)
|
|
|
unpacker.feed(data)
|
|
|
for item in unpacker:
|
|
|
chunks = item.get(b'chunks')
|
|
@@ -525,7 +522,7 @@ Utilization of max. archive size: {csize_max:.0%}
|
|
|
if dry_run or stdout:
|
|
|
if 'chunks' in item:
|
|
|
item_chunks_size = 0
|
|
|
- for _, data in self.pipeline.fetch_many([c.id for c in item.chunks], is_preloaded=True):
|
|
|
+ for data in self.pipeline.fetch_many([c.id for c in item.chunks], is_preloaded=True):
|
|
|
if pi:
|
|
|
pi.show(increase=len(data), info=[remove_surrogates(item.path)])
|
|
|
if stdout:
|
|
@@ -589,7 +586,7 @@ Utilization of max. archive size: {csize_max:.0%}
|
|
|
self.zeros = b'\0' * (1 << self.chunker_params[1])
|
|
|
with fd:
|
|
|
ids = [c.id for c in item.chunks]
|
|
|
- for _, data in self.pipeline.fetch_many(ids, is_preloaded=True):
|
|
|
+ for data in self.pipeline.fetch_many(ids, is_preloaded=True):
|
|
|
if pi:
|
|
|
pi.show(increase=len(data), info=[remove_surrogates(item.path)])
|
|
|
with backup_io('write'):
|
|
@@ -717,7 +714,7 @@ Utilization of max. archive size: {csize_max:.0%}
|
|
|
setattr(metadata, key, value)
|
|
|
data = msgpack.packb(metadata.as_dict(), unicode_errors='surrogateescape')
|
|
|
new_id = self.key.id_hash(data)
|
|
|
- self.cache.add_chunk(new_id, Chunk(data), self.stats)
|
|
|
+ self.cache.add_chunk(new_id, data, self.stats)
|
|
|
self.manifest.archives[self.name] = (new_id, metadata.time)
|
|
|
self.cache.chunk_decref(self.id, self.stats)
|
|
|
self.id = new_id
|
|
@@ -764,7 +761,7 @@ Utilization of max. archive size: {csize_max:.0%}
|
|
|
for (i, (items_id, data)) in enumerate(zip(items_ids, self.repository.get_many(items_ids))):
|
|
|
if progress:
|
|
|
pi.show(i)
|
|
|
- _, data = self.key.decrypt(items_id, data)
|
|
|
+ data = self.key.decrypt(items_id, data)
|
|
|
unpacker.feed(data)
|
|
|
chunk_decref(items_id, stats)
|
|
|
try:
|
|
@@ -879,10 +876,10 @@ Utilization of max. archive size: {csize_max:.0%}
|
|
|
self.write_checkpoint()
|
|
|
return length, number
|
|
|
|
|
|
- def chunk_file(self, item, cache, stats, chunk_iter, chunk_processor=None, **chunk_kw):
|
|
|
+ def chunk_file(self, item, cache, stats, chunk_iter, chunk_processor=None):
|
|
|
if not chunk_processor:
|
|
|
def chunk_processor(data):
|
|
|
- chunk_entry = cache.add_chunk(self.key.id_hash(data), Chunk(data, **chunk_kw), stats, wait=False)
|
|
|
+ chunk_entry = cache.add_chunk(self.key.id_hash(data), data, stats, wait=False)
|
|
|
self.cache.repository.async_response(wait=False)
|
|
|
return chunk_entry
|
|
|
|
|
@@ -971,12 +968,10 @@ Utilization of max. archive size: {csize_max:.0%}
|
|
|
if chunks is not None:
|
|
|
item.chunks = chunks
|
|
|
else:
|
|
|
- compress = self.compression_decider1.decide(path)
|
|
|
- self.file_compression_logger.debug('%s -> compression %s', path, compress.name)
|
|
|
with backup_io('open'):
|
|
|
fh = Archive._open_rb(path)
|
|
|
with os.fdopen(fh, 'rb') as fd:
|
|
|
- self.chunk_file(item, cache, self.stats, backup_io_iter(self.chunker.chunkify(fd, fh)), compress=compress)
|
|
|
+ self.chunk_file(item, cache, self.stats, backup_io_iter(self.chunker.chunkify(fd, fh)))
|
|
|
if not is_special_file:
|
|
|
# we must not memorize special files, because the contents of e.g. a
|
|
|
# block or char device will change without its mtime/size/inode changing.
|
|
@@ -1212,9 +1207,9 @@ class ArchiveChecker:
|
|
|
chunk_ids = list(reversed(chunk_ids_revd))
|
|
|
chunk_data_iter = self.repository.get_many(chunk_ids)
|
|
|
else:
|
|
|
+ _chunk_id = None if chunk_id == Manifest.MANIFEST_ID else chunk_id
|
|
|
try:
|
|
|
- _chunk_id = None if chunk_id == Manifest.MANIFEST_ID else chunk_id
|
|
|
- _, data = self.key.decrypt(_chunk_id, encrypted_data)
|
|
|
+ self.key.decrypt(_chunk_id, encrypted_data)
|
|
|
except IntegrityError as integrity_error:
|
|
|
self.error_found = True
|
|
|
errors += 1
|
|
@@ -1284,7 +1279,7 @@ class ArchiveChecker:
|
|
|
for chunk_id, _ in self.chunks.iteritems():
|
|
|
cdata = self.repository.get(chunk_id)
|
|
|
try:
|
|
|
- _, data = self.key.decrypt(chunk_id, cdata)
|
|
|
+ data = self.key.decrypt(chunk_id, cdata)
|
|
|
except IntegrityError as exc:
|
|
|
logger.error('Skipping corrupted chunk: %s', exc)
|
|
|
self.error_found = True
|
|
@@ -1329,9 +1324,9 @@ class ArchiveChecker:
|
|
|
self.possibly_superseded.add(id_)
|
|
|
|
|
|
def add_callback(chunk):
|
|
|
- id_ = self.key.id_hash(chunk.data)
|
|
|
+ id_ = self.key.id_hash(chunk)
|
|
|
cdata = self.key.encrypt(chunk)
|
|
|
- add_reference(id_, len(chunk.data), len(cdata), cdata)
|
|
|
+ add_reference(id_, len(chunk), len(cdata), cdata)
|
|
|
return id_
|
|
|
|
|
|
def add_reference(id_, size, csize, cdata=None):
|
|
@@ -1352,7 +1347,7 @@ class ArchiveChecker:
|
|
|
def replacement_chunk(size):
|
|
|
data = bytes(size)
|
|
|
chunk_id = self.key.id_hash(data)
|
|
|
- cdata = self.key.encrypt(Chunk(data))
|
|
|
+ cdata = self.key.encrypt(data)
|
|
|
csize = len(cdata)
|
|
|
return chunk_id, size, csize, cdata
|
|
|
|
|
@@ -1461,7 +1456,7 @@ class ArchiveChecker:
|
|
|
if state > 0:
|
|
|
unpacker.resync()
|
|
|
for chunk_id, cdata in zip(items, repository.get_many(items)):
|
|
|
- _, data = self.key.decrypt(chunk_id, cdata)
|
|
|
+ data = self.key.decrypt(chunk_id, cdata)
|
|
|
unpacker.feed(data)
|
|
|
try:
|
|
|
for item in unpacker:
|
|
@@ -1511,7 +1506,7 @@ class ArchiveChecker:
|
|
|
continue
|
|
|
mark_as_possibly_superseded(archive_id)
|
|
|
cdata = self.repository.get(archive_id)
|
|
|
- _, data = self.key.decrypt(archive_id, cdata)
|
|
|
+ data = self.key.decrypt(archive_id, cdata)
|
|
|
archive = ArchiveItem(internal_dict=msgpack.unpackb(data))
|
|
|
if archive.version != 1:
|
|
|
raise Exception('Unknown archive metadata version')
|
|
@@ -1528,7 +1523,7 @@ class ArchiveChecker:
|
|
|
archive.items = items_buffer.chunks
|
|
|
data = msgpack.packb(archive.as_dict(), unicode_errors='surrogateescape')
|
|
|
new_archive_id = self.key.id_hash(data)
|
|
|
- cdata = self.key.encrypt(Chunk(data))
|
|
|
+ cdata = self.key.encrypt(data)
|
|
|
add_reference(new_archive_id, len(data), len(cdata), cdata)
|
|
|
self.manifest.archives[info.name] = (new_archive_id, info.ts)
|
|
|
|
|
@@ -1562,7 +1557,7 @@ class ArchiveRecreater:
|
|
|
|
|
|
def __init__(self, repository, manifest, key, cache, matcher,
|
|
|
exclude_caches=False, exclude_if_present=None, keep_exclude_tags=False,
|
|
|
- chunker_params=None, compression=None, compression_files=None, always_recompress=False,
|
|
|
+ chunker_params=None, compression=None, always_recompress=False,
|
|
|
dry_run=False, stats=False, progress=False, file_status_printer=None,
|
|
|
checkpoint_interval=1800):
|
|
|
self.repository = repository
|
|
@@ -1583,9 +1578,6 @@ class ArchiveRecreater:
|
|
|
self.always_recompress = always_recompress
|
|
|
self.compression = compression or CompressionSpec('none')
|
|
|
self.seen_chunks = set()
|
|
|
- self.compression_decider1 = CompressionDecider1(compression or CompressionSpec('none'),
|
|
|
- compression_files or [])
|
|
|
- key.compression_decider2 = CompressionDecider2(compression or CompressionSpec('none'))
|
|
|
|
|
|
self.dry_run = dry_run
|
|
|
self.stats = stats
|
|
@@ -1654,24 +1646,21 @@ class ArchiveRecreater:
|
|
|
self.cache.chunk_incref(chunk_id, target.stats)
|
|
|
return item.chunks
|
|
|
chunk_iterator = self.iter_chunks(archive, target, list(item.chunks))
|
|
|
- compress = self.compression_decider1.decide(item.path)
|
|
|
- chunk_processor = partial(self.chunk_processor, target, compress)
|
|
|
+ chunk_processor = partial(self.chunk_processor, target)
|
|
|
target.chunk_file(item, self.cache, target.stats, chunk_iterator, chunk_processor)
|
|
|
|
|
|
- def chunk_processor(self, target, compress, data):
|
|
|
+ def chunk_processor(self, target, data):
|
|
|
chunk_id = self.key.id_hash(data)
|
|
|
if chunk_id in self.seen_chunks:
|
|
|
return self.cache.chunk_incref(chunk_id, target.stats)
|
|
|
- chunk = Chunk(data, compress=compress)
|
|
|
- compression_spec, chunk = self.key.compression_decider2.decide(chunk)
|
|
|
overwrite = self.recompress
|
|
|
if self.recompress and not self.always_recompress and chunk_id in self.cache.chunks:
|
|
|
# Check if this chunk is already compressed the way we want it
|
|
|
old_chunk = self.key.decrypt(None, self.repository.get(chunk_id), decompress=False)
|
|
|
- if Compressor.detect(old_chunk.data).name == compression_spec.name:
|
|
|
+ if Compressor.detect(old_chunk).name == self.key.compressor.decide(data).name:
|
|
|
# Stored chunk has the same compression we wanted
|
|
|
overwrite = False
|
|
|
- chunk_entry = self.cache.add_chunk(chunk_id, chunk, target.stats, overwrite=overwrite, wait=False)
|
|
|
+ chunk_entry = self.cache.add_chunk(chunk_id, data, target.stats, overwrite=overwrite, wait=False)
|
|
|
self.cache.repository.async_response(wait=False)
|
|
|
self.seen_chunks.add(chunk_entry.id)
|
|
|
return chunk_entry
|
|
@@ -1685,7 +1674,7 @@ class ArchiveRecreater:
|
|
|
yield from target.chunker.chunkify(file)
|
|
|
else:
|
|
|
for chunk in chunk_iterator:
|
|
|
- yield chunk.data
|
|
|
+ yield chunk
|
|
|
|
|
|
def save(self, archive, target, comment=None, replace_original=True):
|
|
|
if self.dry_run:
|
|
@@ -1756,7 +1745,7 @@ class ArchiveRecreater:
|
|
|
def create_target_archive(self, name):
|
|
|
target = Archive(self.repository, self.key, self.manifest, name, create=True,
|
|
|
progress=self.progress, chunker_params=self.chunker_params, cache=self.cache,
|
|
|
- checkpoint_interval=self.checkpoint_interval, compression=self.compression)
|
|
|
+ checkpoint_interval=self.checkpoint_interval)
|
|
|
return target
|
|
|
|
|
|
def open_archive(self, name, **kwargs):
|