|
@@ -48,6 +48,7 @@ from .item import Item, ArchiveItem, ItemDiff
|
|
|
from .platform import acl_get, acl_set, set_flags, get_flags, swidth, hostname
|
|
|
from .remote import cache_if_remote
|
|
|
from .repository import Repository, LIST_SCAN_LIMIT
|
|
|
+from .repoobj import RepoObj
|
|
|
|
|
|
has_link = hasattr(os, "link")
|
|
|
|
|
@@ -262,9 +263,9 @@ def OsOpen(*, flags, path=None, parent_fd=None, name=None, noatime=False, op="op
|
|
|
|
|
|
|
|
|
class DownloadPipeline:
|
|
|
- def __init__(self, repository, key):
|
|
|
+ def __init__(self, repository, repo_objs):
|
|
|
self.repository = repository
|
|
|
- self.key = key
|
|
|
+ self.repo_objs = repo_objs
|
|
|
|
|
|
def unpack_many(self, ids, *, filter=None, preload=False):
|
|
|
"""
|
|
@@ -308,8 +309,9 @@ class DownloadPipeline:
|
|
|
yield item
|
|
|
|
|
|
def fetch_many(self, ids, is_preloaded=False):
|
|
|
- for id_, data in zip(ids, self.repository.get_many(ids, is_preloaded=is_preloaded)):
|
|
|
- yield self.key.decrypt(id_, data)
|
|
|
+ for id_, cdata in zip(ids, self.repository.get_many(ids, is_preloaded=is_preloaded)):
|
|
|
+ _, data = self.repo_objs.parse(id_, cdata)
|
|
|
+ yield data
|
|
|
|
|
|
|
|
|
class ChunkBuffer:
|
|
@@ -391,12 +393,12 @@ def get_item_uid_gid(item, *, numeric, uid_forced=None, gid_forced=None, uid_def
|
|
|
return uid, gid
|
|
|
|
|
|
|
|
|
-def archive_get_items(metadata, key, repository):
|
|
|
+def archive_get_items(metadata, *, repo_objs, repository):
|
|
|
if "item_ptrs" in metadata: # looks like a v2+ archive
|
|
|
assert "items" not in metadata
|
|
|
items = []
|
|
|
- for id, data in zip(metadata.item_ptrs, repository.get_many(metadata.item_ptrs)):
|
|
|
- data = key.decrypt(id, data)
|
|
|
+ for id, cdata in zip(metadata.item_ptrs, repository.get_many(metadata.item_ptrs)):
|
|
|
+ _, data = repo_objs.parse(id, cdata)
|
|
|
ids = msgpack.unpackb(data)
|
|
|
items.extend(ids)
|
|
|
return items
|
|
@@ -406,16 +408,16 @@ def archive_get_items(metadata, key, repository):
|
|
|
return metadata.items
|
|
|
|
|
|
|
|
|
-def archive_put_items(chunk_ids, *, key, cache=None, stats=None, add_reference=None):
|
|
|
+def archive_put_items(chunk_ids, *, repo_objs, cache=None, stats=None, add_reference=None):
|
|
|
"""gets a (potentially large) list of archive metadata stream chunk ids and writes them to repo objects"""
|
|
|
item_ptrs = []
|
|
|
for i in range(0, len(chunk_ids), IDS_PER_CHUNK):
|
|
|
data = msgpack.packb(chunk_ids[i : i + IDS_PER_CHUNK])
|
|
|
- id = key.id_hash(data)
|
|
|
+ id = repo_objs.id_hash(data)
|
|
|
if cache is not None and stats is not None:
|
|
|
cache.add_chunk(id, data, stats)
|
|
|
elif add_reference is not None:
|
|
|
- cdata = key.encrypt(id, data)
|
|
|
+ cdata = repo_objs.format(id, {}, data)
|
|
|
add_reference(id, len(data), cdata)
|
|
|
else:
|
|
|
raise NotImplementedError
|
|
@@ -435,8 +437,6 @@ class Archive:
|
|
|
|
|
|
def __init__(
|
|
|
self,
|
|
|
- repository,
|
|
|
- key,
|
|
|
manifest,
|
|
|
name,
|
|
|
cache=None,
|
|
@@ -458,10 +458,12 @@ class Archive:
|
|
|
iec=False,
|
|
|
):
|
|
|
self.cwd = os.getcwd()
|
|
|
- self.key = key
|
|
|
- self.repository = repository
|
|
|
- self.cache = cache
|
|
|
+ assert isinstance(manifest, Manifest)
|
|
|
self.manifest = manifest
|
|
|
+ self.key = manifest.repo_objs.key
|
|
|
+ self.repo_objs = manifest.repo_objs
|
|
|
+ self.repository = manifest.repository
|
|
|
+ self.cache = cache
|
|
|
self.stats = Statistics(output_json=log_json, iec=iec)
|
|
|
self.iec = iec
|
|
|
self.show_progress = progress
|
|
@@ -488,7 +490,7 @@ class Archive:
|
|
|
end = datetime.now().astimezone() # local time with local timezone
|
|
|
self.end = end
|
|
|
self.consider_part_files = consider_part_files
|
|
|
- self.pipeline = DownloadPipeline(self.repository, self.key)
|
|
|
+ self.pipeline = DownloadPipeline(self.repository, self.repo_objs)
|
|
|
self.create = create
|
|
|
if self.create:
|
|
|
self.items_buffer = CacheChunkBuffer(self.cache, self.key, self.stats)
|
|
@@ -507,12 +509,13 @@ class Archive:
|
|
|
self.load(info.id)
|
|
|
|
|
|
def _load_meta(self, id):
|
|
|
- data = self.key.decrypt(id, self.repository.get(id))
|
|
|
+ cdata = self.repository.get(id)
|
|
|
+ _, data = self.repo_objs.parse(id, cdata)
|
|
|
metadata = ArchiveItem(internal_dict=msgpack.unpackb(data))
|
|
|
if metadata.version not in (1, 2): # legacy: still need to read v1 archives
|
|
|
raise Exception("Unknown archive metadata version")
|
|
|
# note: metadata.items must not get written to disk!
|
|
|
- metadata.items = archive_get_items(metadata, self.key, self.repository)
|
|
|
+ metadata.items = archive_get_items(metadata, repo_objs=self.repo_objs, repository=self.repository)
|
|
|
return metadata
|
|
|
|
|
|
def load(self, id):
|
|
@@ -626,7 +629,9 @@ Duration: {0.duration}
|
|
|
if name in self.manifest.archives:
|
|
|
raise self.AlreadyExists(name)
|
|
|
self.items_buffer.flush(flush=True)
|
|
|
- item_ptrs = archive_put_items(self.items_buffer.chunks, key=self.key, cache=self.cache, stats=self.stats)
|
|
|
+ item_ptrs = archive_put_items(
|
|
|
+ self.items_buffer.chunks, repo_objs=self.repo_objs, cache=self.cache, stats=self.stats
|
|
|
+ )
|
|
|
duration = timedelta(seconds=time.monotonic() - self.start_monotonic)
|
|
|
if timestamp is None:
|
|
|
end = datetime.now().astimezone() # local time with local timezone
|
|
@@ -660,7 +665,7 @@ Duration: {0.duration}
|
|
|
metadata.update(additional_metadata or {})
|
|
|
metadata = ArchiveItem(metadata)
|
|
|
data = self.key.pack_and_authenticate_metadata(metadata.as_dict(), context=b"archive")
|
|
|
- self.id = self.key.id_hash(data)
|
|
|
+ self.id = self.repo_objs.id_hash(data)
|
|
|
try:
|
|
|
self.cache.add_chunk(self.id, data, self.stats)
|
|
|
except IntegrityError as err:
|
|
@@ -699,7 +704,7 @@ Duration: {0.duration}
|
|
|
for id, chunk in zip(self.metadata.items, self.repository.get_many(self.metadata.items)):
|
|
|
pi.show(increase=1)
|
|
|
add(id)
|
|
|
- data = self.key.decrypt(id, chunk)
|
|
|
+ _, data = self.repo_objs.parse(id, chunk)
|
|
|
sync.feed(data)
|
|
|
unique_size = archive_index.stats_against(cache.chunks)[1]
|
|
|
pi.finish()
|
|
@@ -1011,7 +1016,7 @@ Duration: {0.duration}
|
|
|
for (i, (items_id, data)) in enumerate(zip(items_ids, self.repository.get_many(items_ids))):
|
|
|
if progress:
|
|
|
pi.show(i)
|
|
|
- data = self.key.decrypt(items_id, data)
|
|
|
+ _, data = self.repo_objs.parse(items_id, data)
|
|
|
unpacker.feed(data)
|
|
|
chunk_decref(items_id, stats)
|
|
|
try:
|
|
@@ -1666,6 +1671,7 @@ class ArchiveChecker:
|
|
|
logger.error("Repository contains no apparent data at all, cannot continue check/repair.")
|
|
|
return False
|
|
|
self.key = self.make_key(repository)
|
|
|
+ self.repo_objs = RepoObj(self.key)
|
|
|
if verify_data:
|
|
|
self.verify_data()
|
|
|
if Manifest.MANIFEST_ID not in self.chunks:
|
|
@@ -1674,7 +1680,7 @@ class ArchiveChecker:
|
|
|
self.manifest = self.rebuild_manifest()
|
|
|
else:
|
|
|
try:
|
|
|
- self.manifest, _ = Manifest.load(repository, (Manifest.Operation.CHECK,), key=self.key)
|
|
|
+ self.manifest = Manifest.load(repository, (Manifest.Operation.CHECK,), key=self.key)
|
|
|
except IntegrityErrorBase as exc:
|
|
|
logger.error("Repository manifest is corrupted: %s", exc)
|
|
|
self.error_found = True
|
|
@@ -1765,7 +1771,7 @@ class ArchiveChecker:
|
|
|
chunk_data_iter = self.repository.get_many(chunk_ids)
|
|
|
else:
|
|
|
try:
|
|
|
- self.key.decrypt(chunk_id, encrypted_data, decompress=decompress)
|
|
|
+ self.repo_objs.parse(chunk_id, encrypted_data, decompress=decompress)
|
|
|
except IntegrityErrorBase as integrity_error:
|
|
|
self.error_found = True
|
|
|
errors += 1
|
|
@@ -1796,7 +1802,7 @@ class ArchiveChecker:
|
|
|
# from the underlying media.
|
|
|
try:
|
|
|
encrypted_data = self.repository.get(defect_chunk)
|
|
|
- self.key.decrypt(defect_chunk, encrypted_data, decompress=decompress)
|
|
|
+ self.repo_objs.parse(defect_chunk, encrypted_data, decompress=decompress)
|
|
|
except IntegrityErrorBase:
|
|
|
# failed twice -> get rid of this chunk
|
|
|
del self.chunks[defect_chunk]
|
|
@@ -1844,7 +1850,7 @@ class ArchiveChecker:
|
|
|
pi.show()
|
|
|
cdata = self.repository.get(chunk_id)
|
|
|
try:
|
|
|
- data = self.key.decrypt(chunk_id, cdata)
|
|
|
+ _, data = self.repo_objs.parse(chunk_id, cdata)
|
|
|
except IntegrityErrorBase as exc:
|
|
|
logger.error("Skipping corrupted chunk: %s", exc)
|
|
|
self.error_found = True
|
|
@@ -1890,7 +1896,7 @@ class ArchiveChecker:
|
|
|
|
|
|
def add_callback(chunk):
|
|
|
id_ = self.key.id_hash(chunk)
|
|
|
- cdata = self.key.encrypt(id_, chunk)
|
|
|
+ cdata = self.repo_objs.format(id_, {}, chunk)
|
|
|
add_reference(id_, len(chunk), cdata)
|
|
|
return id_
|
|
|
|
|
@@ -1913,7 +1919,7 @@ class ArchiveChecker:
|
|
|
def replacement_chunk(size):
|
|
|
chunk = Chunk(None, allocation=CH_ALLOC, size=size)
|
|
|
chunk_id, data = cached_hash(chunk, self.key.id_hash)
|
|
|
- cdata = self.key.encrypt(chunk_id, data)
|
|
|
+ cdata = self.repo_objs.format(chunk_id, {}, data)
|
|
|
return chunk_id, size, cdata
|
|
|
|
|
|
offset = 0
|
|
@@ -2032,7 +2038,7 @@ class ArchiveChecker:
|
|
|
return True, ""
|
|
|
|
|
|
i = 0
|
|
|
- archive_items = archive_get_items(archive, self.key, repository)
|
|
|
+ archive_items = archive_get_items(archive, repo_objs=self.repo_objs, repository=repository)
|
|
|
for state, items in groupby(archive_items, missing_chunk_detector):
|
|
|
items = list(items)
|
|
|
if state % 2:
|
|
@@ -2044,7 +2050,7 @@ class ArchiveChecker:
|
|
|
unpacker.resync()
|
|
|
for chunk_id, cdata in zip(items, repository.get_many(items)):
|
|
|
try:
|
|
|
- data = self.key.decrypt(chunk_id, cdata)
|
|
|
+ _, data = self.repo_objs.parse(chunk_id, cdata)
|
|
|
unpacker.feed(data)
|
|
|
for item in unpacker:
|
|
|
valid, reason = valid_item(item)
|
|
@@ -2057,7 +2063,7 @@ class ArchiveChecker:
|
|
|
i,
|
|
|
)
|
|
|
except IntegrityError as integrity_error:
|
|
|
- # key.decrypt() detected integrity issues.
|
|
|
+ # repo_objs.parse() detected integrity issues.
|
|
|
# maybe the repo gave us a valid cdata, but not for the chunk_id we wanted.
|
|
|
# or the authentication of cdata failed, meaning the encrypted data was corrupted.
|
|
|
report(str(integrity_error), chunk_id, i)
|
|
@@ -2098,7 +2104,7 @@ class ArchiveChecker:
|
|
|
mark_as_possibly_superseded(archive_id)
|
|
|
cdata = self.repository.get(archive_id)
|
|
|
try:
|
|
|
- data = self.key.decrypt(archive_id, cdata)
|
|
|
+ _, data = self.repo_objs.parse(archive_id, cdata)
|
|
|
except IntegrityError as integrity_error:
|
|
|
logger.error("Archive metadata block %s is corrupted: %s", bin_to_hex(archive_id), integrity_error)
|
|
|
self.error_found = True
|
|
@@ -2114,14 +2120,18 @@ class ArchiveChecker:
|
|
|
verify_file_chunks(info.name, item)
|
|
|
items_buffer.add(item)
|
|
|
items_buffer.flush(flush=True)
|
|
|
- for previous_item_id in archive_get_items(archive, self.key, self.repository):
|
|
|
+ for previous_item_id in archive_get_items(
|
|
|
+ archive, repo_objs=self.repo_objs, repository=self.repository
|
|
|
+ ):
|
|
|
mark_as_possibly_superseded(previous_item_id)
|
|
|
for previous_item_ptr in archive.item_ptrs:
|
|
|
mark_as_possibly_superseded(previous_item_ptr)
|
|
|
- archive.item_ptrs = archive_put_items(items_buffer.chunks, key=self.key, add_reference=add_reference)
|
|
|
+ archive.item_ptrs = archive_put_items(
|
|
|
+ items_buffer.chunks, repo_objs=self.repo_objs, add_reference=add_reference
|
|
|
+ )
|
|
|
data = msgpack.packb(archive.as_dict())
|
|
|
new_archive_id = self.key.id_hash(data)
|
|
|
- cdata = self.key.encrypt(new_archive_id, data)
|
|
|
+ cdata = self.repo_objs.format(new_archive_id, {}, data)
|
|
|
add_reference(new_archive_id, len(data), cdata)
|
|
|
self.manifest.archives[info.name] = (new_archive_id, info.ts)
|
|
|
pi.finish()
|
|
@@ -2162,9 +2172,7 @@ class ArchiveRecreater:
|
|
|
|
|
|
def __init__(
|
|
|
self,
|
|
|
- repository,
|
|
|
manifest,
|
|
|
- key,
|
|
|
cache,
|
|
|
matcher,
|
|
|
exclude_caches=False,
|
|
@@ -2181,9 +2189,10 @@ class ArchiveRecreater:
|
|
|
timestamp=None,
|
|
|
checkpoint_interval=1800,
|
|
|
):
|
|
|
- self.repository = repository
|
|
|
- self.key = key
|
|
|
self.manifest = manifest
|
|
|
+ self.repository = manifest.repository
|
|
|
+ self.key = manifest.key
|
|
|
+ self.repo_objs = manifest.repo_objs
|
|
|
self.cache = cache
|
|
|
|
|
|
self.matcher = matcher
|
|
@@ -2260,9 +2269,12 @@ class ArchiveRecreater:
|
|
|
overwrite = self.recompress
|
|
|
if self.recompress and not self.always_recompress and chunk_id in self.cache.chunks:
|
|
|
# Check if this chunk is already compressed the way we want it
|
|
|
- old_chunk = self.key.decrypt(chunk_id, self.repository.get(chunk_id), decompress=False)
|
|
|
+ _, old_chunk = self.repo_objs.parse(chunk_id, self.repository.get(chunk_id), decompress=False)
|
|
|
compressor_cls, level = Compressor.detect(old_chunk)
|
|
|
- if compressor_cls.name == self.key.compressor.decide(data).name and level == self.key.compressor.level:
|
|
|
+ if (
|
|
|
+ compressor_cls.name == self.repo_objs.compressor.decide(data).name
|
|
|
+ and level == self.repo_objs.compressor.level
|
|
|
+ ):
|
|
|
# Stored chunk has the same compression method and level as we wanted
|
|
|
overwrite = False
|
|
|
chunk_entry = self.cache.add_chunk(chunk_id, data, target.stats, overwrite=overwrite, wait=False)
|
|
@@ -2371,8 +2383,6 @@ class ArchiveRecreater:
|
|
|
|
|
|
def create_target_archive(self, name):
|
|
|
target = Archive(
|
|
|
- self.repository,
|
|
|
- self.key,
|
|
|
self.manifest,
|
|
|
name,
|
|
|
create=True,
|
|
@@ -2384,4 +2394,4 @@ class ArchiveRecreater:
|
|
|
return target
|
|
|
|
|
|
def open_archive(self, name, **kwargs):
|
|
|
- return Archive(self.repository, self.key, self.manifest, name, cache=self.cache, **kwargs)
|
|
|
+ return Archive(self.manifest, name, cache=self.cache, **kwargs)
|