Przeglądaj źródła

when building the chunk index, merge all we have in the cache

Thomas Waldmann 11 miesięcy temu
rodzic
commit
00f8cdc9a7
1 zmienionych plików z 36 dodań i 19 usunięć
  1. 36 19
      src/borg/cache.py

+ 36 - 19
src/borg/cache.py

@@ -739,28 +739,45 @@ def write_chunkindex_to_repo_cache(repository, chunks, *, clear=False, force_wri
     return new_hash
 
 
+def read_chunkindex_from_repo_cache(repository, hash):
+    cache_name = f"cache/chunks.{hash}"
+    logger.debug(f"trying to load {cache_name} from the repo...")
+    try:
+        chunks_data = repository.store_load(cache_name)
+    except (Repository.ObjectNotFound, StoreObjectNotFound):
+        # TODO: ^ seem like RemoteRepository raises Repository.ONF instead of StoreONF
+        logger.debug(f"{cache_name} not found in the repository.")
+    else:
+        if xxh64(chunks_data, seed=CHUNKINDEX_HASH_SEED) == hex_to_bin(hash):
+            logger.debug(f"{cache_name} is valid.")
+            with io.BytesIO(chunks_data) as f:
+                chunks = ChunkIndex.read(f)
+            return chunks
+        else:
+            logger.debug(f"{cache_name} is invalid.")
+
+
 def build_chunkindex_from_repo(repository, *, disable_caches=False, cache_immediately=False):
     try_upgrade_to_b14(repository)
-    # first, try to load a pre-computed and centrally cached chunks index:
+    # first, try to build a fresh, mostly complete chunk index from centrally cached chunk indexes:
     if not disable_caches:
         hashes = list_chunkindex_hashes(repository)
-        assert len(hashes) <= 1, f"chunk indexes: {hashes}"  # later we change to multiple chunkindexes...
-        for hash in hashes:
-            cache_name = f"cache/chunks.{hash}"
-            logger.debug(f"trying to load {cache_name} from the repo...")
-            try:
-                chunks_data = repository.store_load(cache_name)
-            except (Repository.ObjectNotFound, StoreObjectNotFound):
-                # TODO: ^ seem like RemoteRepository raises Repository.ONF instead of StoreONF
-                logger.debug(f"{cache_name} not found in the repository.")
-            else:
-                if xxh64(chunks_data, seed=CHUNKINDEX_HASH_SEED) == hex_to_bin(hash):
-                    logger.debug(f"{cache_name} is valid.")
-                    with io.BytesIO(chunks_data) as f:
-                        chunks = ChunkIndex.read(f)
-                    return chunks
-                else:
-                    logger.debug(f"{cache_name} is invalid.")
+        if hashes:  # we have at least one cached chunk index!
+            merged = 0
+            chunks = ChunkIndex()  # we'll merge all we find into this
+            for hash in hashes:
+                chunks_to_merge = read_chunkindex_from_repo_cache(repository, hash)
+                if chunks_to_merge is not None:
+                    logger.debug(f"cached chunk index {hash} gets merged...")
+                    for k, v in chunks_to_merge.items():
+                        chunks[k] = v
+                    merged += 1
+                    chunks_to_merge.clear()
+            if merged > 0:
+                if merged > 1 and cache_immediately:
+                    # immediately update cache/chunks, so we don't have to merge these again:
+                    write_chunkindex_to_repo_cache(repository, chunks, clear=False, force_write=True, delete_other=True)
+                return chunks
     # if we didn't get anything from the cache, compute the ChunkIndex the slow way:
     logger.debug("querying the chunk IDs list from the repo...")
     chunks = ChunkIndex()
@@ -858,7 +875,7 @@ class ChunksMixin:
 
     def _write_chunks_cache(self, chunks):
         # this is called from .close, so we can clear here:
-        write_chunkindex_to_repo_cache(self.repository, self._chunks, clear=True, delete_other=True)
+        write_chunkindex_to_repo_cache(self.repository, self._chunks, clear=True, delete_other=False)
         self._chunks = None  # nothing there (cleared!)
 
     def refresh_lock(self, now):