|
@@ -296,7 +296,7 @@ class DownloadPipeline:
|
|
|
"""
|
|
|
hlids_preloaded = set()
|
|
|
unpacker = msgpack.Unpacker(use_list=False)
|
|
|
- for data in self.fetch_many(ids):
|
|
|
+ for data in self.fetch_many(ids, ro_type=ROBJ_ARCHIVE_STREAM):
|
|
|
unpacker.feed(data)
|
|
|
for _item in unpacker:
|
|
|
item = Item(internal_dict=_item)
|
|
@@ -318,9 +318,10 @@ class DownloadPipeline:
|
|
|
self.repository.preload([c.id for c in item.chunks])
|
|
|
yield item
|
|
|
|
|
|
- def fetch_many(self, ids, is_preloaded=False):
|
|
|
+ def fetch_many(self, ids, is_preloaded=False, ro_type=None):
|
|
|
+ assert ro_type is not None
|
|
|
for id_, cdata in zip(ids, self.repository.get_many(ids, is_preloaded=is_preloaded)):
|
|
|
- _, data = self.repo_objs.parse(id_, cdata)
|
|
|
+ _, data = self.repo_objs.parse(id_, cdata, ro_type=ro_type)
|
|
|
yield data
|
|
|
|
|
|
|
|
@@ -393,7 +394,9 @@ class CacheChunkBuffer(ChunkBuffer):
|
|
|
self.stats = stats
|
|
|
|
|
|
def write_chunk(self, chunk):
|
|
|
- id_, _ = self.cache.add_chunk(self.key.id_hash(chunk), {}, chunk, stats=self.stats, wait=False)
|
|
|
+ id_, _ = self.cache.add_chunk(
|
|
|
+ self.key.id_hash(chunk), {}, chunk, stats=self.stats, wait=False, ro_type=ROBJ_ARCHIVE_STREAM
|
|
|
+ )
|
|
|
logger.debug(f"writing item metadata stream chunk {bin_to_hex(id_)}")
|
|
|
self.cache.repository.async_response(wait=False)
|
|
|
return id_
|
|
@@ -422,7 +425,7 @@ def archive_get_items(metadata, *, repo_objs, repository):
|
|
|
assert "items" not in metadata
|
|
|
items = []
|
|
|
for id, cdata in zip(metadata.item_ptrs, repository.get_many(metadata.item_ptrs)):
|
|
|
- _, data = repo_objs.parse(id, cdata)
|
|
|
+ _, data = repo_objs.parse(id, cdata, ro_type=ROBJ_ARCHIVE_CHUNKIDS)
|
|
|
ids = msgpack.unpackb(data)
|
|
|
items.extend(ids)
|
|
|
return items
|
|
@@ -440,9 +443,9 @@ def archive_put_items(chunk_ids, *, repo_objs, cache=None, stats=None, add_refer
|
|
|
id = repo_objs.id_hash(data)
|
|
|
logger.debug(f"writing item_ptrs chunk {bin_to_hex(id)}")
|
|
|
if cache is not None and stats is not None:
|
|
|
- cache.add_chunk(id, {}, data, stats=stats)
|
|
|
+ cache.add_chunk(id, {}, data, stats=stats, ro_type=ROBJ_ARCHIVE_CHUNKIDS)
|
|
|
elif add_reference is not None:
|
|
|
- cdata = repo_objs.format(id, {}, data)
|
|
|
+ cdata = repo_objs.format(id, {}, data, ro_type=ROBJ_ARCHIVE_CHUNKIDS)
|
|
|
add_reference(id, len(data), cdata)
|
|
|
else:
|
|
|
raise NotImplementedError
|
|
@@ -531,8 +534,8 @@ class Archive:
|
|
|
|
|
|
def _load_meta(self, id):
|
|
|
cdata = self.repository.get(id)
|
|
|
- _, data = self.repo_objs.parse(id, cdata)
|
|
|
- archive, _ = self.key.unpack_and_verify_archive(data)
|
|
|
+ _, data = self.repo_objs.parse(id, cdata, ro_type=ROBJ_ARCHIVE_META)
|
|
|
+ archive = self.key.unpack_archive(data)
|
|
|
metadata = ArchiveItem(internal_dict=archive)
|
|
|
if metadata.version not in (1, 2): # legacy: still need to read v1 archives
|
|
|
raise Exception("Unknown archive metadata version")
|
|
@@ -699,10 +702,10 @@ Duration: {0.duration}
|
|
|
metadata.update({"size": stats.osize, "nfiles": stats.nfiles})
|
|
|
metadata.update(additional_metadata or {})
|
|
|
metadata = ArchiveItem(metadata)
|
|
|
- data = self.key.pack_and_authenticate_metadata(metadata.as_dict(), context=b"archive")
|
|
|
+ data = self.key.pack_metadata(metadata.as_dict())
|
|
|
self.id = self.repo_objs.id_hash(data)
|
|
|
try:
|
|
|
- self.cache.add_chunk(self.id, {}, data, stats=self.stats)
|
|
|
+ self.cache.add_chunk(self.id, {}, data, stats=self.stats, ro_type=ROBJ_ARCHIVE_META)
|
|
|
except IntegrityError as err:
|
|
|
err_msg = str(err)
|
|
|
# hack to avoid changing the RPC protocol by introducing new (more specific) exception class
|
|
@@ -740,7 +743,7 @@ Duration: {0.duration}
|
|
|
for id, chunk in zip(self.metadata.items, self.repository.get_many(self.metadata.items)):
|
|
|
pi.show(increase=1)
|
|
|
add(id)
|
|
|
- _, data = self.repo_objs.parse(id, chunk)
|
|
|
+ _, data = self.repo_objs.parse(id, chunk, ro_type=ROBJ_ARCHIVE_STREAM)
|
|
|
sync.feed(data)
|
|
|
unique_size = archive_index.stats_against(cache.chunks)[1]
|
|
|
pi.finish()
|
|
@@ -826,7 +829,9 @@ Duration: {0.duration}
|
|
|
# it would get stuck.
|
|
|
if "chunks" in item:
|
|
|
item_chunks_size = 0
|
|
|
- for data in self.pipeline.fetch_many([c.id for c in item.chunks], is_preloaded=True):
|
|
|
+ for data in self.pipeline.fetch_many(
|
|
|
+ [c.id for c in item.chunks], is_preloaded=True, ro_type=ROBJ_FILE_STREAM
|
|
|
+ ):
|
|
|
if pi:
|
|
|
pi.show(increase=len(data), info=[remove_surrogates(item.path)])
|
|
|
if stdout:
|
|
@@ -878,7 +883,7 @@ Duration: {0.duration}
|
|
|
fd = open(path, "wb")
|
|
|
with fd:
|
|
|
ids = [c.id for c in item.chunks]
|
|
|
- for data in self.pipeline.fetch_many(ids, is_preloaded=True):
|
|
|
+ for data in self.pipeline.fetch_many(ids, is_preloaded=True, ro_type=ROBJ_FILE_STREAM):
|
|
|
if pi:
|
|
|
pi.show(increase=len(data), info=[remove_surrogates(item.path)])
|
|
|
with backup_io("write"):
|
|
@@ -1025,9 +1030,9 @@ Duration: {0.duration}
|
|
|
setattr(metadata, key, value)
|
|
|
if "items" in metadata:
|
|
|
del metadata.items
|
|
|
- data = self.key.pack_and_authenticate_metadata(metadata.as_dict(), context=b"archive")
|
|
|
+ data = self.key.pack_metadata(metadata.as_dict())
|
|
|
new_id = self.key.id_hash(data)
|
|
|
- self.cache.add_chunk(new_id, {}, data, stats=self.stats)
|
|
|
+ self.cache.add_chunk(new_id, {}, data, stats=self.stats, ro_type=ROBJ_ARCHIVE_META)
|
|
|
self.manifest.archives[self.name] = (new_id, metadata.time)
|
|
|
self.cache.chunk_decref(self.id, self.stats)
|
|
|
self.id = new_id
|
|
@@ -1076,7 +1081,7 @@ Duration: {0.duration}
|
|
|
for i, (items_id, data) in enumerate(zip(items_ids, self.repository.get_many(items_ids))):
|
|
|
if progress:
|
|
|
pi.show(i)
|
|
|
- _, data = self.repo_objs.parse(items_id, data)
|
|
|
+ _, data = self.repo_objs.parse(items_id, data, ro_type=ROBJ_ARCHIVE_STREAM)
|
|
|
unpacker.feed(data)
|
|
|
chunk_decref(items_id, stats)
|
|
|
try:
|
|
@@ -1132,8 +1137,8 @@ Duration: {0.duration}
|
|
|
path,
|
|
|
item1,
|
|
|
item2,
|
|
|
- archive1.pipeline.fetch_many([c.id for c in item1.get("chunks", [])]),
|
|
|
- archive2.pipeline.fetch_many([c.id for c in item2.get("chunks", [])]),
|
|
|
+ archive1.pipeline.fetch_many([c.id for c in item1.get("chunks", [])], ro_type=ROBJ_FILE_STREAM),
|
|
|
+ archive2.pipeline.fetch_many([c.id for c in item2.get("chunks", [])], ro_type=ROBJ_FILE_STREAM),
|
|
|
can_compare_chunk_ids=can_compare_chunk_ids,
|
|
|
)
|
|
|
|
|
@@ -1319,7 +1324,7 @@ class ChunksProcessor:
|
|
|
started_hashing = time.monotonic()
|
|
|
chunk_id, data = cached_hash(chunk, self.key.id_hash)
|
|
|
stats.hashing_time += time.monotonic() - started_hashing
|
|
|
- chunk_entry = cache.add_chunk(chunk_id, {}, data, stats=stats, wait=False)
|
|
|
+ chunk_entry = cache.add_chunk(chunk_id, {}, data, stats=stats, wait=False, ro_type=ROBJ_FILE_STREAM)
|
|
|
self.cache.repository.async_response(wait=False)
|
|
|
return chunk_entry
|
|
|
|
|
@@ -1898,7 +1903,7 @@ class ArchiveChecker:
|
|
|
else:
|
|
|
try:
|
|
|
# we must decompress, so it'll call assert_id() in there:
|
|
|
- self.repo_objs.parse(chunk_id, encrypted_data, decompress=True)
|
|
|
+ self.repo_objs.parse(chunk_id, encrypted_data, decompress=True, ro_type=ROBJ_DONTCARE)
|
|
|
except IntegrityErrorBase as integrity_error:
|
|
|
self.error_found = True
|
|
|
errors += 1
|
|
@@ -1930,7 +1935,7 @@ class ArchiveChecker:
|
|
|
try:
|
|
|
encrypted_data = self.repository.get(defect_chunk)
|
|
|
# we must decompress, so it'll call assert_id() in there:
|
|
|
- self.repo_objs.parse(defect_chunk, encrypted_data, decompress=True)
|
|
|
+ self.repo_objs.parse(defect_chunk, encrypted_data, decompress=True, ro_type=ROBJ_DONTCARE)
|
|
|
except IntegrityErrorBase:
|
|
|
# failed twice -> get rid of this chunk
|
|
|
del self.chunks[defect_chunk]
|
|
@@ -1970,7 +1975,6 @@ class ArchiveChecker:
|
|
|
# lost manifest on a older borg version than the most recent one that was ever used
|
|
|
# within this repository (assuming that newer borg versions support more item keys).
|
|
|
manifest = Manifest(self.key, self.repository)
|
|
|
- archive_keys_serialized = [msgpack.packb(name) for name in ARCHIVE_KEYS]
|
|
|
pi = ProgressIndicatorPercent(
|
|
|
total=len(self.chunks), msg="Rebuilding manifest %6.2f%%", step=0.01, msgid="check.rebuild_manifest"
|
|
|
)
|
|
@@ -1978,14 +1982,12 @@ class ArchiveChecker:
|
|
|
pi.show()
|
|
|
cdata = self.repository.get(chunk_id)
|
|
|
try:
|
|
|
- _, data = self.repo_objs.parse(chunk_id, cdata)
|
|
|
+ meta, data = self.repo_objs.parse(chunk_id, cdata, ro_type=ROBJ_DONTCARE)
|
|
|
except IntegrityErrorBase as exc:
|
|
|
logger.error("Skipping corrupted chunk: %s", exc)
|
|
|
self.error_found = True
|
|
|
continue
|
|
|
- if not valid_msgpacked_dict(data, archive_keys_serialized):
|
|
|
- continue
|
|
|
- if b"command_line" not in data or b"\xa7version\x02" not in data:
|
|
|
+ if meta["type"] != ROBJ_ARCHIVE_META:
|
|
|
continue
|
|
|
try:
|
|
|
archive = msgpack.unpackb(data)
|
|
@@ -1993,23 +1995,7 @@ class ArchiveChecker:
|
|
|
except msgpack.UnpackException:
|
|
|
continue
|
|
|
if valid_archive(archive):
|
|
|
- # **after** doing the low-level checks and having a strong indication that we
|
|
|
- # are likely looking at an archive item here, also check the TAM authentication:
|
|
|
- try:
|
|
|
- archive, _ = self.key.unpack_and_verify_archive(data)
|
|
|
- except IntegrityError as integrity_error:
|
|
|
- # TAM issues - do not accept this archive!
|
|
|
- # either somebody is trying to attack us with a fake archive data or
|
|
|
- # we have an ancient archive made before TAM was a thing (borg < 1.0.9) **and** this repo
|
|
|
- # was not correctly upgraded to borg 1.2.5 (see advisory at top of the changelog).
|
|
|
- # borg can't tell the difference, so it has to assume this archive might be an attack
|
|
|
- # and drops this archive.
|
|
|
- name = archive.get(b"name", b"<unknown>").decode("ascii", "replace")
|
|
|
- logger.error("Archive TAM authentication issue for archive %s: %s", name, integrity_error)
|
|
|
- logger.error("This archive will *not* be added to the rebuilt manifest! It will be deleted.")
|
|
|
- self.error_found = True
|
|
|
- continue
|
|
|
- # note: if we get here and verified is False, a TAM is not required.
|
|
|
+ archive = self.key.unpack_archive(data)
|
|
|
archive = ArchiveItem(internal_dict=archive)
|
|
|
name = archive.name
|
|
|
logger.info("Found archive %s", name)
|
|
@@ -2043,7 +2029,7 @@ class ArchiveChecker:
|
|
|
|
|
|
def add_callback(chunk):
|
|
|
id_ = self.key.id_hash(chunk)
|
|
|
- cdata = self.repo_objs.format(id_, {}, chunk)
|
|
|
+ cdata = self.repo_objs.format(id_, {}, chunk, ro_type=ROBJ_ARCHIVE_STREAM)
|
|
|
add_reference(id_, len(chunk), cdata)
|
|
|
return id_
|
|
|
|
|
@@ -2066,7 +2052,7 @@ class ArchiveChecker:
|
|
|
def replacement_chunk(size):
|
|
|
chunk = Chunk(None, allocation=CH_ALLOC, size=size)
|
|
|
chunk_id, data = cached_hash(chunk, self.key.id_hash)
|
|
|
- cdata = self.repo_objs.format(chunk_id, {}, data)
|
|
|
+ cdata = self.repo_objs.format(chunk_id, {}, data, ro_type=ROBJ_FILE_STREAM)
|
|
|
return chunk_id, size, cdata
|
|
|
|
|
|
offset = 0
|
|
@@ -2197,7 +2183,7 @@ class ArchiveChecker:
|
|
|
unpacker.resync()
|
|
|
for chunk_id, cdata in zip(items, repository.get_many(items)):
|
|
|
try:
|
|
|
- _, data = self.repo_objs.parse(chunk_id, cdata)
|
|
|
+ _, data = self.repo_objs.parse(chunk_id, cdata, ro_type=ROBJ_ARCHIVE_STREAM)
|
|
|
unpacker.feed(data)
|
|
|
for item in unpacker:
|
|
|
valid, reason = valid_item(item)
|
|
@@ -2260,23 +2246,13 @@ class ArchiveChecker:
|
|
|
mark_as_possibly_superseded(archive_id)
|
|
|
cdata = self.repository.get(archive_id)
|
|
|
try:
|
|
|
- _, data = self.repo_objs.parse(archive_id, cdata)
|
|
|
+ _, data = self.repo_objs.parse(archive_id, cdata, ro_type=ROBJ_ARCHIVE_META)
|
|
|
except IntegrityError as integrity_error:
|
|
|
logger.error("Archive metadata block %s is corrupted: %s", bin_to_hex(archive_id), integrity_error)
|
|
|
self.error_found = True
|
|
|
del self.manifest.archives[info.name]
|
|
|
continue
|
|
|
- try:
|
|
|
- archive, salt = self.key.unpack_and_verify_archive(data)
|
|
|
- except IntegrityError as integrity_error:
|
|
|
- # looks like there is a TAM issue with this archive, this might be an attack!
|
|
|
- # when upgrading to borg 1.2.5, users are expected to TAM-authenticate all archives they
|
|
|
- # trust, so there shouldn't be any without TAM.
|
|
|
- logger.error("Archive TAM authentication issue for archive %s: %s", info.name, integrity_error)
|
|
|
- logger.error("This archive will be *removed* from the manifest! It will be deleted.")
|
|
|
- self.error_found = True
|
|
|
- del self.manifest.archives[info.name]
|
|
|
- continue
|
|
|
+ archive = self.key.unpack_archive(data)
|
|
|
archive = ArchiveItem(internal_dict=archive)
|
|
|
if archive.version != 2:
|
|
|
raise Exception("Unknown archive metadata version")
|
|
@@ -2296,9 +2272,9 @@ class ArchiveChecker:
|
|
|
archive.item_ptrs = archive_put_items(
|
|
|
items_buffer.chunks, repo_objs=self.repo_objs, add_reference=add_reference
|
|
|
)
|
|
|
- data = self.key.pack_and_authenticate_metadata(archive.as_dict(), context=b"archive", salt=salt)
|
|
|
+ data = self.key.pack_metadata(archive.as_dict())
|
|
|
new_archive_id = self.key.id_hash(data)
|
|
|
- cdata = self.repo_objs.format(new_archive_id, {}, data)
|
|
|
+ cdata = self.repo_objs.format(new_archive_id, {}, data, ro_type=ROBJ_ARCHIVE_META)
|
|
|
add_reference(new_archive_id, len(data), cdata)
|
|
|
self.manifest.archives[info.name] = (new_archive_id, info.ts)
|
|
|
pi.finish()
|
|
@@ -2434,13 +2410,13 @@ class ArchiveRecreater:
|
|
|
chunk_id, data = cached_hash(chunk, self.key.id_hash)
|
|
|
if chunk_id in self.seen_chunks:
|
|
|
return self.cache.chunk_incref(chunk_id, target.stats)
|
|
|
- chunk_entry = self.cache.add_chunk(chunk_id, {}, data, stats=target.stats, wait=False)
|
|
|
+ chunk_entry = self.cache.add_chunk(chunk_id, {}, data, stats=target.stats, wait=False, ro_type=ROBJ_FILE_STREAM)
|
|
|
self.cache.repository.async_response(wait=False)
|
|
|
self.seen_chunks.add(chunk_entry.id)
|
|
|
return chunk_entry
|
|
|
|
|
|
def iter_chunks(self, archive, target, chunks):
|
|
|
- chunk_iterator = archive.pipeline.fetch_many([chunk_id for chunk_id, _ in chunks])
|
|
|
+ chunk_iterator = archive.pipeline.fetch_many([chunk_id for chunk_id, _ in chunks], ro_type=ROBJ_FILE_STREAM)
|
|
|
if target.recreate_rechunkify:
|
|
|
# The target.chunker will read the file contents through ChunkIteratorFileWrapper chunk-by-chunk
|
|
|
# (does not load the entire file into memory)
|