|
@@ -643,14 +643,14 @@ Duration: {0.duration}
|
|
|
# so we can already remove it here, the next .save() will then commit this cleanup.
|
|
|
# remove its manifest entry, remove its ArchiveItem chunk, remove its item_ptrs chunks:
|
|
|
del self.manifest.archives[self.checkpoint_name]
|
|
|
- self.cache.chunk_decref(self.id, self.stats)
|
|
|
+ self.cache.chunk_decref(self.id, 1, self.stats)
|
|
|
for id in metadata.item_ptrs:
|
|
|
- self.cache.chunk_decref(id, self.stats)
|
|
|
+ self.cache.chunk_decref(id, 1, self.stats)
|
|
|
# also get rid of that part item, we do not want to have it in next checkpoint or final archive
|
|
|
tail_chunks = self.items_buffer.restore_chunks_state()
|
|
|
# tail_chunks contain the tail of the archive items metadata stream, not needed for next commit.
|
|
|
for id in tail_chunks:
|
|
|
- self.cache.chunk_decref(id, self.stats)
|
|
|
+ self.cache.chunk_decref(id, 1, self.stats) # TODO can we have real size here?
|
|
|
|
|
|
def save(self, name=None, comment=None, timestamp=None, stats=None, additional_metadata=None):
|
|
|
name = name or self.name
|
|
@@ -1024,7 +1024,7 @@ Duration: {0.duration}
|
|
|
new_id = self.key.id_hash(data)
|
|
|
self.cache.add_chunk(new_id, {}, data, stats=self.stats, ro_type=ROBJ_ARCHIVE_META)
|
|
|
self.manifest.archives[self.name] = (new_id, metadata.time)
|
|
|
- self.cache.chunk_decref(self.id, self.stats)
|
|
|
+ self.cache.chunk_decref(self.id, 1, self.stats)
|
|
|
self.id = new_id
|
|
|
|
|
|
def rename(self, name):
|
|
@@ -1052,12 +1052,15 @@ Duration: {0.duration}
|
|
|
error = True
|
|
|
return exception_ignored # must not return None here
|
|
|
|
|
|
- def chunk_decref(id, stats):
|
|
|
+ def chunk_decref(id, size, stats):
|
|
|
try:
|
|
|
- self.cache.chunk_decref(id, stats, wait=False)
|
|
|
+ self.cache.chunk_decref(id, size, stats, wait=False)
|
|
|
except KeyError:
|
|
|
- cid = bin_to_hex(id)
|
|
|
- raise ChunksIndexError(cid)
|
|
|
+ nonlocal error
|
|
|
+ if forced == 0:
|
|
|
+ cid = bin_to_hex(id)
|
|
|
+ raise ChunksIndexError(cid)
|
|
|
+ error = True
|
|
|
else:
|
|
|
fetch_async_response(wait=False)
|
|
|
|
|
@@ -1073,13 +1076,13 @@ Duration: {0.duration}
|
|
|
pi.show(i)
|
|
|
_, data = self.repo_objs.parse(items_id, data, ro_type=ROBJ_ARCHIVE_STREAM)
|
|
|
unpacker.feed(data)
|
|
|
- chunk_decref(items_id, stats)
|
|
|
+ chunk_decref(items_id, 1, stats)
|
|
|
try:
|
|
|
for item in unpacker:
|
|
|
item = Item(internal_dict=item)
|
|
|
if "chunks" in item:
|
|
|
for chunk_id, size in item.chunks:
|
|
|
- chunk_decref(chunk_id, stats)
|
|
|
+ chunk_decref(chunk_id, size, stats)
|
|
|
except (TypeError, ValueError):
|
|
|
# if items metadata spans multiple chunks and one chunk got dropped somehow,
|
|
|
# it could be that unpacker yields bad types
|
|
@@ -1096,12 +1099,12 @@ Duration: {0.duration}
|
|
|
|
|
|
# delete the blocks that store all the references that end up being loaded into metadata.items:
|
|
|
for id in self.metadata.item_ptrs:
|
|
|
- chunk_decref(id, stats)
|
|
|
+ chunk_decref(id, 1, stats)
|
|
|
|
|
|
# in forced delete mode, we try hard to delete at least the manifest entry,
|
|
|
# if possible also the archive superblock, even if processing the items raises
|
|
|
# some harmless exception.
|
|
|
- chunk_decref(self.id, stats)
|
|
|
+ chunk_decref(self.id, 1, stats)
|
|
|
del self.manifest.archives[self.name]
|
|
|
while fetch_async_response(wait=True) is not None:
|
|
|
# we did async deletes, process outstanding results (== exceptions),
|
|
@@ -1510,7 +1513,7 @@ class FilesystemObjectProcessors:
|
|
|
except BackupOSError:
|
|
|
# see comments in process_file's exception handler, same issue here.
|
|
|
for chunk in item.get("chunks", []):
|
|
|
- cache.chunk_decref(chunk.id, self.stats, wait=False)
|
|
|
+ cache.chunk_decref(chunk.id, chunk.size, self.stats, wait=False)
|
|
|
raise
|
|
|
else:
|
|
|
item.get_size(memorize=True)
|
|
@@ -1544,7 +1547,7 @@ class FilesystemObjectProcessors:
|
|
|
item.chunks = []
|
|
|
for chunk_id, chunk_size in hl_chunks:
|
|
|
# process one-by-one, so we will know in item.chunks how far we got
|
|
|
- chunk_entry = cache.chunk_incref(chunk_id, self.stats)
|
|
|
+ chunk_entry = cache.chunk_incref(chunk_id, chunk_size, self.stats)
|
|
|
item.chunks.append(chunk_entry)
|
|
|
else: # normal case, no "2nd+" hardlink
|
|
|
if not is_special_file:
|
|
@@ -1552,26 +1555,26 @@ class FilesystemObjectProcessors:
|
|
|
started_hashing = time.monotonic()
|
|
|
path_hash = self.key.id_hash(hashed_path)
|
|
|
self.stats.hashing_time += time.monotonic() - started_hashing
|
|
|
- known, ids = cache.file_known_and_unchanged(hashed_path, path_hash, st)
|
|
|
+ known, chunks = cache.file_known_and_unchanged(hashed_path, path_hash, st)
|
|
|
else:
|
|
|
# in --read-special mode, we may be called for special files.
|
|
|
# there should be no information in the cache about special files processed in
|
|
|
# read-special mode, but we better play safe as this was wrong in the past:
|
|
|
hashed_path = path_hash = None
|
|
|
- known, ids = False, None
|
|
|
- if ids is not None:
|
|
|
+ known, chunks = False, None
|
|
|
+ if chunks is not None:
|
|
|
# Make sure all ids are available
|
|
|
- for id_ in ids:
|
|
|
- if not cache.seen_chunk(id_):
|
|
|
+ for chunk in chunks:
|
|
|
+ if not cache.seen_chunk(chunk.id):
|
|
|
# cache said it is unmodified, but we lost a chunk: process file like modified
|
|
|
status = "M"
|
|
|
break
|
|
|
else:
|
|
|
item.chunks = []
|
|
|
- for chunk_id in ids:
|
|
|
+ for chunk in chunks:
|
|
|
# process one-by-one, so we will know in item.chunks how far we got
|
|
|
- chunk_entry = cache.chunk_incref(chunk_id, self.stats)
|
|
|
- item.chunks.append(chunk_entry)
|
|
|
+ cache.chunk_incref(chunk.id, chunk.size, self.stats)
|
|
|
+ item.chunks.append(chunk)
|
|
|
status = "U" # regular file, unchanged
|
|
|
else:
|
|
|
status = "M" if known else "A" # regular file, modified or added
|
|
@@ -1606,7 +1609,7 @@ class FilesystemObjectProcessors:
|
|
|
# block or char device will change without its mtime/size/inode changing.
|
|
|
# also, we must not memorize a potentially inconsistent/corrupt file that
|
|
|
# changed while we backed it up.
|
|
|
- cache.memorize_file(hashed_path, path_hash, st, [c.id for c in item.chunks])
|
|
|
+ cache.memorize_file(hashed_path, path_hash, st, item.chunks)
|
|
|
self.stats.files_stats[status] += 1 # must be done late
|
|
|
if not changed_while_backup:
|
|
|
status = None # we already called print_file_status
|
|
@@ -1620,7 +1623,7 @@ class FilesystemObjectProcessors:
|
|
|
# but we will not add an item (see add_item in create_helper) and thus
|
|
|
# they would be orphaned chunks in case that we commit the transaction.
|
|
|
for chunk in item.get("chunks", []):
|
|
|
- cache.chunk_decref(chunk.id, self.stats, wait=False)
|
|
|
+ cache.chunk_decref(chunk.id, chunk.size, self.stats, wait=False)
|
|
|
# Now that we have cleaned up the chunk references, we can re-raise the exception.
|
|
|
# This will skip processing of this file, but might retry or continue with the next one.
|
|
|
raise
|
|
@@ -1731,7 +1734,7 @@ class TarfileObjectProcessors:
|
|
|
except BackupOSError:
|
|
|
# see comment in FilesystemObjectProcessors.process_file, same issue here.
|
|
|
for chunk in item.get("chunks", []):
|
|
|
- self.cache.chunk_decref(chunk.id, self.stats, wait=False)
|
|
|
+ self.cache.chunk_decref(chunk.id, chunk.size, self.stats, wait=False)
|
|
|
raise
|
|
|
|
|
|
|
|
@@ -2328,10 +2331,10 @@ class ArchiveChecker:
|
|
|
unused = {id_ for id_, entry in self.chunks.iteritems() if entry.refcount == 0}
|
|
|
orphaned = unused - self.possibly_superseded
|
|
|
if orphaned:
|
|
|
- logger.error(f"{len(orphaned)} orphaned objects found!")
|
|
|
+ logger.info(f"{len(orphaned)} orphaned (unused) objects found.")
|
|
|
for chunk_id in orphaned:
|
|
|
logger.debug(f"chunk {bin_to_hex(chunk_id)} is orphaned.")
|
|
|
- self.error_found = True
|
|
|
+ # To support working with AdHocCache or AdHocWithFilesCache, we do not set self.error_found = True.
|
|
|
if self.repair and unused:
|
|
|
logger.info(
|
|
|
"Deleting %d orphaned and %d superseded objects..." % (len(orphaned), len(self.possibly_superseded))
|
|
@@ -2444,7 +2447,7 @@ class ArchiveRecreater:
|
|
|
def process_chunks(self, archive, target, item):
|
|
|
if not target.recreate_rechunkify:
|
|
|
for chunk_id, size in item.chunks:
|
|
|
- self.cache.chunk_incref(chunk_id, target.stats)
|
|
|
+ self.cache.chunk_incref(chunk_id, size, target.stats)
|
|
|
return item.chunks
|
|
|
chunk_iterator = self.iter_chunks(archive, target, list(item.chunks))
|
|
|
chunk_processor = partial(self.chunk_processor, target)
|
|
@@ -2452,8 +2455,9 @@ class ArchiveRecreater:
|
|
|
|
|
|
def chunk_processor(self, target, chunk):
|
|
|
chunk_id, data = cached_hash(chunk, self.key.id_hash)
|
|
|
+ size = len(data)
|
|
|
if chunk_id in self.seen_chunks:
|
|
|
- return self.cache.chunk_incref(chunk_id, target.stats)
|
|
|
+ return self.cache.chunk_incref(chunk_id, size, target.stats)
|
|
|
chunk_entry = self.cache.add_chunk(chunk_id, {}, data, stats=target.stats, wait=False, ro_type=ROBJ_FILE_STREAM)
|
|
|
self.cache.repository.async_response(wait=False)
|
|
|
self.seen_chunks.add(chunk_entry.id)
|