|
@@ -396,9 +396,7 @@ class FilesCacheMixin:
|
|
|
assert isinstance(entry, FileCacheEntry)
|
|
|
compressed_chunks = []
|
|
|
for id, size in entry.chunks:
|
|
|
- cie = self.chunks.get(id)
|
|
|
- assert cie is not None
|
|
|
- assert cie.flags & ChunkIndex.F_USED
|
|
|
+ cie = self.chunks[id] # may raise KeyError if chunk id is not in repo
|
|
|
if cie.size == 0: # size is not known in the chunks index yet
|
|
|
self.chunks[id] = cie._replace(size=size)
|
|
|
else:
|
|
@@ -418,9 +416,7 @@ class FilesCacheMixin:
|
|
|
for idx in entry.chunks:
|
|
|
assert isinstance(idx, int), f"{idx} is not an int"
|
|
|
id = self.chunks.idx_to_k(idx)
|
|
|
- cie = self.chunks.get(id)
|
|
|
- assert cie is not None
|
|
|
- assert cie.flags & ChunkIndex.F_USED
|
|
|
+ cie = self.chunks[id]
|
|
|
assert cie.size > 0
|
|
|
chunks.append((id, cie.size))
|
|
|
entry = entry._replace(chunks=chunks)
|
|
@@ -485,6 +481,7 @@ class FilesCacheMixin:
|
|
|
mtime=int_to_timestamp(mtime_ns),
|
|
|
chunks=item.chunks,
|
|
|
)
|
|
|
+ # note: if the repo is an a valid state, next line should not fail with KeyError:
|
|
|
files[path_hash] = self.compress_entry(entry)
|
|
|
# deal with special snapshot / timestamp granularity case, see FAQ:
|
|
|
for path_hash in self._newest_path_hashes:
|
|
@@ -529,7 +526,11 @@ class FilesCacheMixin:
|
|
|
for path_hash, entry in u:
|
|
|
entry = FileCacheEntry(*entry)
|
|
|
entry = entry._replace(age=entry.age + 1)
|
|
|
- files[path_hash] = self.compress_entry(entry)
|
|
|
+ try:
|
|
|
+ files[path_hash] = self.compress_entry(entry)
|
|
|
+ except KeyError:
|
|
|
+ # repo is missing a chunk referenced from entry
|
|
|
+ logger.debug(f"compress_entry failed for {entry}, skipping.")
|
|
|
except (TypeError, ValueError) as exc:
|
|
|
msg = "The files cache seems invalid. [%s]" % str(exc)
|
|
|
break
|
|
@@ -706,14 +707,23 @@ CHUNKINDEX_HASH_SEED = 2
|
|
|
def write_chunkindex_to_repo_cache(
|
|
|
repository, chunks, *, clear=False, force_write=False, delete_other=False, delete_these=None
|
|
|
):
|
|
|
- cached_hashes = list_chunkindex_hashes(repository)
|
|
|
+ # the borghash code has no means to only serialize the F_NEW table entries,
|
|
|
+ # thus we copy only the new entries to a temporary table:
|
|
|
+ new_chunks = ChunkIndex()
|
|
|
+ # for now, we don't want to serialize the flags or the size, just the keys (chunk IDs):
|
|
|
+ cleaned_value = ChunkIndexEntry(flags=ChunkIndex.F_NONE, size=0)
|
|
|
+ for key, _ in chunks.iteritems(only_new=True):
|
|
|
+ new_chunks[key] = cleaned_value
|
|
|
with io.BytesIO() as f:
|
|
|
- chunks.write(f)
|
|
|
+ new_chunks.write(f)
|
|
|
data = f.getvalue()
|
|
|
+ logger.debug(f"caching {len(new_chunks)} new chunks.")
|
|
|
+ new_chunks.clear() # free memory of the temporary table
|
|
|
if clear:
|
|
|
# if we don't need the in-memory chunks index anymore:
|
|
|
chunks.clear() # free memory, immediately
|
|
|
new_hash = bin_to_hex(xxh64(data, seed=CHUNKINDEX_HASH_SEED))
|
|
|
+ cached_hashes = list_chunkindex_hashes(repository)
|
|
|
if force_write or new_hash not in cached_hashes:
|
|
|
# when an updated chunks index is stored into the cache, we also store its hash as part of the name.
|
|
|
# when a client is loading the chunks index from a cache, it has to compare its xxh64
|
|
@@ -725,12 +735,15 @@ def write_chunkindex_to_repo_cache(
|
|
|
cache_name = f"cache/chunks.{new_hash}"
|
|
|
logger.debug(f"caching chunks index as {cache_name} in repository...")
|
|
|
repository.store_store(cache_name, data)
|
|
|
+ # we have successfully stored to the repository, so we can clear all F_NEW flags now:
|
|
|
+ chunks.clear_new()
|
|
|
+ # delete some not needed cached chunk indexes, but never the one we just wrote:
|
|
|
if delete_other:
|
|
|
- delete_these = cached_hashes
|
|
|
+ delete_these = set(cached_hashes) - {new_hash}
|
|
|
elif delete_these:
|
|
|
- pass
|
|
|
+ delete_these = set(delete_these) - {new_hash}
|
|
|
else:
|
|
|
- delete_these = []
|
|
|
+ delete_these = set()
|
|
|
for hash in delete_these:
|
|
|
cache_name = f"cache/chunks.{hash}"
|
|
|
try:
|
|
@@ -783,6 +796,8 @@ def build_chunkindex_from_repo(repository, *, disable_caches=False, cache_immedi
|
|
|
write_chunkindex_to_repo_cache(
|
|
|
repository, chunks, clear=False, force_write=True, delete_these=hashes
|
|
|
)
|
|
|
+ else:
|
|
|
+ chunks.clear_new()
|
|
|
return chunks
|
|
|
# if we didn't get anything from the cache, compute the ChunkIndex the slow way:
|
|
|
logger.debug("querying the chunk IDs list from the repo...")
|