|
@@ -401,6 +401,7 @@ class CacheChunkBuffer(ChunkBuffer):
|
|
|
|
|
|
def write_chunk(self, chunk):
|
|
|
id_, _ = self.cache.add_chunk(self.key.id_hash(chunk), {}, chunk, stats=self.stats, wait=False)
|
|
|
+ logger.debug(f"writing item metadata stream chunk {bin_to_hex(id_)}")
|
|
|
self.cache.repository.async_response(wait=False)
|
|
|
return id_
|
|
|
|
|
@@ -444,6 +445,7 @@ def archive_put_items(chunk_ids, *, repo_objs, cache=None, stats=None, add_refer
|
|
|
for i in range(0, len(chunk_ids), IDS_PER_CHUNK):
|
|
|
data = msgpack.packb(chunk_ids[i : i + IDS_PER_CHUNK])
|
|
|
id = repo_objs.id_hash(data)
|
|
|
+ logger.debug(f"writing item_ptrs chunk {bin_to_hex(id)}")
|
|
|
if cache is not None and stats is not None:
|
|
|
cache.add_chunk(id, {}, data, stats=stats)
|
|
|
elif add_reference is not None:
|
|
@@ -648,9 +650,16 @@ Duration: {0.duration}
|
|
|
self.items_buffer.add(item)
|
|
|
|
|
|
def write_checkpoint(self):
|
|
|
- self.save(self.checkpoint_name)
|
|
|
+ metadata = self.save(self.checkpoint_name)
|
|
|
+ # that .save() has committed the repo.
|
|
|
+ # at next commit, we won't need this checkpoint archive any more because we will then
|
|
|
+ # have either a newer checkpoint archive or the final archive.
|
|
|
+ # so we can already remove it here, the next .save() will then commit this cleanup.
|
|
|
+ # remove its manifest entry, remove its ArchiveItem chunk, remove its item_ptrs chunks:
|
|
|
del self.manifest.archives[self.checkpoint_name]
|
|
|
self.cache.chunk_decref(self.id, self.stats)
|
|
|
+ for id in metadata.item_ptrs:
|
|
|
+ self.cache.chunk_decref(id, self.stats)
|
|
|
|
|
|
def save(self, name=None, comment=None, timestamp=None, stats=None, additional_metadata=None):
|
|
|
name = name or self.name
|
|
@@ -712,6 +721,7 @@ Duration: {0.duration}
|
|
|
self.manifest.write()
|
|
|
self.repository.commit(compact=False)
|
|
|
self.cache.commit()
|
|
|
+ return metadata
|
|
|
|
|
|
def calc_stats(self, cache, want_unique=True):
|
|
|
if not want_unique:
|
|
@@ -2165,7 +2175,7 @@ class ArchiveChecker:
|
|
|
if last and len(archive_infos) < last:
|
|
|
logger.warning("--last %d archives: only found %d archives", last, len(archive_infos))
|
|
|
else:
|
|
|
- archive_infos = self.manifest.archives.list(sort_by=sort_by)
|
|
|
+ archive_infos = self.manifest.archives.list(sort_by=sort_by, consider_checkpoints=True)
|
|
|
num_archives = len(archive_infos)
|
|
|
|
|
|
pi = ProgressIndicatorPercent(
|
|
@@ -2222,6 +2232,8 @@ class ArchiveChecker:
|
|
|
orphaned = unused - self.possibly_superseded
|
|
|
if orphaned:
|
|
|
logger.error(f"{len(orphaned)} orphaned objects found!")
|
|
|
+ for chunk_id in orphaned:
|
|
|
+ logger.debug(f"chunk {bin_to_hex(chunk_id)} is orphaned.")
|
|
|
self.error_found = True
|
|
|
if self.repair and unused:
|
|
|
logger.info(
|