|
@@ -391,6 +391,38 @@ def get_item_uid_gid(item, *, numeric, uid_forced=None, gid_forced=None, uid_def
|
|
|
return uid, gid
|
|
|
|
|
|
|
|
|
+def archive_get_items(metadata, key, repository):
|
|
|
+ if "item_ptrs" in metadata: # looks like a v2+ archive
|
|
|
+ assert "items" not in metadata
|
|
|
+ items = []
|
|
|
+ for id, data in zip(metadata.item_ptrs, repository.get_many(metadata.item_ptrs)):
|
|
|
+ data = key.decrypt(id, data)
|
|
|
+ ids = msgpack.unpackb(data)
|
|
|
+ items.extend(ids)
|
|
|
+ return items
|
|
|
+
|
|
|
+ if "items" in metadata: # legacy, v1 archive
|
|
|
+ assert "item_ptrs" not in metadata
|
|
|
+ return metadata.items
|
|
|
+
|
|
|
+
|
|
|
+def archive_put_items(chunk_ids, *, key, cache=None, stats=None, add_reference=None):
|
|
|
+ """gets a (potentially large) list of archive metadata stream chunk ids and writes them to repo objects"""
|
|
|
+ item_ptrs = []
|
|
|
+ for i in range(0, len(chunk_ids), IDS_PER_CHUNK):
|
|
|
+ data = msgpack.packb(chunk_ids[i : i + IDS_PER_CHUNK])
|
|
|
+ id = key.id_hash(data)
|
|
|
+ if cache is not None and stats is not None:
|
|
|
+ cache.add_chunk(id, data, stats)
|
|
|
+ elif add_reference is not None:
|
|
|
+ cdata = key.encrypt(id, data)
|
|
|
+ add_reference(id, len(data), cdata)
|
|
|
+ else:
|
|
|
+ raise NotImplementedError
|
|
|
+ item_ptrs.append(id)
|
|
|
+ return item_ptrs
|
|
|
+
|
|
|
+
|
|
|
class Archive:
|
|
|
class DoesNotExist(Error):
|
|
|
"""Archive {} does not exist"""
|
|
@@ -479,6 +511,8 @@ class Archive:
|
|
|
metadata = ArchiveItem(internal_dict=msgpack.unpackb(data))
|
|
|
if metadata.version not in (1, 2): # legacy: still need to read v1 archives
|
|
|
raise Exception("Unknown archive metadata version")
|
|
|
+ # note: metadata.items must not get written to disk!
|
|
|
+ metadata.items = archive_get_items(metadata, self.key, self.repository)
|
|
|
return metadata
|
|
|
|
|
|
def load(self, id):
|
|
@@ -512,10 +546,6 @@ class Archive:
|
|
|
def duration_from_meta(self):
|
|
|
return format_timedelta(self.ts_end - self.ts)
|
|
|
|
|
|
- def _archive_csize(self):
|
|
|
- cdata = self.repository.get(self.id)
|
|
|
- return len(cdata)
|
|
|
-
|
|
|
def info(self):
|
|
|
if self.create:
|
|
|
stats = self.stats
|
|
@@ -532,7 +562,6 @@ class Archive:
|
|
|
"end": OutputTimestamp(end),
|
|
|
"duration": (end - start).total_seconds(),
|
|
|
"stats": stats.as_dict(),
|
|
|
- "limits": {"max_archive_size": self._archive_csize() / MAX_DATA_SIZE},
|
|
|
}
|
|
|
if self.create:
|
|
|
info["command_line"] = sys.argv
|
|
@@ -556,12 +585,10 @@ Archive fingerprint: {0.fpr}
|
|
|
Time (start): {start}
|
|
|
Time (end): {end}
|
|
|
Duration: {0.duration}
|
|
|
-Utilization of max. archive size: {csize_max:.0%}
|
|
|
""".format(
|
|
|
self,
|
|
|
start=OutputTimestamp(self.start.replace(tzinfo=timezone.utc)),
|
|
|
end=OutputTimestamp(self.end.replace(tzinfo=timezone.utc)),
|
|
|
- csize_max=self._archive_csize() / MAX_DATA_SIZE,
|
|
|
location=self.repository._location.canonical_path(),
|
|
|
)
|
|
|
|
|
@@ -599,6 +626,7 @@ Utilization of max. archive size: {csize_max:.0%}
|
|
|
if name in self.manifest.archives:
|
|
|
raise self.AlreadyExists(name)
|
|
|
self.items_buffer.flush(flush=True)
|
|
|
+ item_ptrs = archive_put_items(self.items_buffer.chunks, key=self.key, cache=self.cache, stats=self.stats)
|
|
|
duration = timedelta(seconds=time.monotonic() - self.start_monotonic)
|
|
|
if timestamp is None:
|
|
|
end = datetime.utcnow()
|
|
@@ -612,7 +640,7 @@ Utilization of max. archive size: {csize_max:.0%}
|
|
|
"version": 2,
|
|
|
"name": name,
|
|
|
"comment": comment or "",
|
|
|
- "items": self.items_buffer.chunks,
|
|
|
+ "item_ptrs": item_ptrs, # see #1473
|
|
|
"cmdline": sys.argv,
|
|
|
"hostname": hostname,
|
|
|
"username": getuser(),
|
|
@@ -930,6 +958,8 @@ Utilization of max. archive size: {csize_max:.0%}
|
|
|
def set_meta(self, key, value):
|
|
|
metadata = self._load_meta(self.id)
|
|
|
setattr(metadata, key, value)
|
|
|
+ if "items" in metadata:
|
|
|
+ del metadata.items
|
|
|
data = msgpack.packb(metadata.as_dict())
|
|
|
new_id = self.key.id_hash(data)
|
|
|
self.cache.add_chunk(new_id, data, self.stats)
|
|
@@ -1004,6 +1034,11 @@ Utilization of max. archive size: {csize_max:.0%}
|
|
|
if forced == 0:
|
|
|
raise
|
|
|
error = True
|
|
|
+
|
|
|
+ # delete the blocks that store all the references that end up being loaded into metadata.items:
|
|
|
+ for id in self.metadata.item_ptrs:
|
|
|
+ chunk_decref(id, stats)
|
|
|
+
|
|
|
# in forced delete mode, we try hard to delete at least the manifest entry,
|
|
|
# if possible also the archive superblock, even if processing the items raises
|
|
|
# some harmless exception.
|
|
@@ -1997,7 +2032,8 @@ class ArchiveChecker:
|
|
|
return True, ""
|
|
|
|
|
|
i = 0
|
|
|
- for state, items in groupby(archive.items, missing_chunk_detector):
|
|
|
+ archive_items = archive_get_items(archive, self.key, repository)
|
|
|
+ for state, items in groupby(archive_items, missing_chunk_detector):
|
|
|
items = list(items)
|
|
|
if state % 2:
|
|
|
for chunk_id in items:
|
|
@@ -2078,9 +2114,11 @@ class ArchiveChecker:
|
|
|
verify_file_chunks(info.name, item)
|
|
|
items_buffer.add(item)
|
|
|
items_buffer.flush(flush=True)
|
|
|
- for previous_item_id in archive.items:
|
|
|
+ for previous_item_id in archive_get_items(archive, self.key, self.repository):
|
|
|
mark_as_possibly_superseded(previous_item_id)
|
|
|
- archive.items = items_buffer.chunks
|
|
|
+ for previous_item_ptr in archive.item_ptrs:
|
|
|
+ mark_as_possibly_superseded(previous_item_ptr)
|
|
|
+ archive.item_ptrs = archive_put_items(items_buffer.chunks, key=self.key, add_reference=add_reference)
|
|
|
data = msgpack.packb(archive.as_dict())
|
|
|
new_archive_id = self.key.id_hash(data)
|
|
|
cdata = self.key.encrypt(new_archive_id, data)
|