2
0
Эх сурвалжийг харах

Merge pull request #8531 from ThomasWaldmann/chunkindex

chunk index cache: use cache/chunks.<HASH>, see #8503
TW 6 сар өмнө
parent
commit
d266b2fd2d

+ 4 - 13
src/borg/archive.py

@@ -22,7 +22,7 @@ logger = create_logger()
 
 from . import xattr
 from .chunker import get_chunker, Chunk
-from .cache import ChunkListEntry, build_chunkindex_from_repo
+from .cache import ChunkListEntry, build_chunkindex_from_repo, delete_chunkindex_cache
 from .crypto.key import key_factory, UnsupportedPayloadError
 from .compress import CompressionSpec
 from .constants import *  # NOQA
@@ -50,7 +50,7 @@ from .patterns import PathPrefixPattern, FnmatchPattern, IECommand
 from .item import Item, ArchiveItem, ItemDiff
 from .platform import acl_get, acl_set, set_flags, get_flags, swidth, hostname
 from .remote import RemoteRepository, cache_if_remote
-from .repository import Repository, NoManifestError, StoreObjectNotFound
+from .repository import Repository, NoManifestError
 from .repoobj import RepoObj
 
 has_link = hasattr(os, "link")
@@ -2140,18 +2140,9 @@ class ArchiveChecker:
 
     def finish(self):
         if self.repair:
+            # we may have deleted chunks, remove the chunks index cache!
             logger.info("Deleting chunks cache in repository - next repository access will cause a rebuild.")
-            # we may have deleted chunks, invalidate/remove the chunks index cache!
-            try:
-                self.repository.store_delete("cache/chunks_hash")
-            except (Repository.ObjectNotFound, StoreObjectNotFound):
-                # TODO: ^ seem like RemoteRepository raises Repository.ONF instead of StoreONF
-                pass
-            try:
-                self.repository.store_delete("cache/chunks")
-            except (Repository.ObjectNotFound, StoreObjectNotFound):
-                # TODO: ^ seem like RemoteRepository raises Repository.ONF instead of StoreONF
-                pass
+            delete_chunkindex_cache(self.repository)
             logger.info("Writing Manifest.")
             self.manifest.write()
 

+ 1 - 1
src/borg/archiver/compact_cmd.py

@@ -65,7 +65,7 @@ class ArchiveGarbageCollector:
             # as we put the wrong size in there, we need to clean up the size:
             self.chunks[id] = entry._replace(size=0)
         # now self.chunks is an uptodate ChunkIndex, usable for general borg usage!
-        write_chunkindex_to_repo_cache(self.repository, self.chunks, clear=True, force_write=True)
+        write_chunkindex_to_repo_cache(self.repository, self.chunks, clear=True, force_write=True, delete_other=True)
         self.chunks = None  # nothing there (cleared!)
 
     def analyze_archives(self) -> Tuple[Set, Set, int, int, int]:

+ 103 - 39
src/borg/cache.py

@@ -13,6 +13,8 @@ logger = create_logger()
 
 files_cache_logger = create_logger("borg.debug.files_cache")
 
+from borgstore.store import ItemInfo
+
 from .constants import CACHE_README, FILES_CACHE_MODE_DISABLED, ROBJ_FILE_STREAM, TIME_DIFFERS2_NS
 from .checksums import xxh64
 from .hashindex import ChunkIndex, ChunkIndexEntry
@@ -663,63 +665,125 @@ class FilesCacheMixin:
         )
 
 
-def load_chunks_hash(repository) -> bytes:
-    try:
-        hash = repository.store_load("cache/chunks_hash")
-        logger.debug(f"cache/chunks_hash is '{bin_to_hex(hash)}'.")
-    except (Repository.ObjectNotFound, StoreObjectNotFound):
-        # TODO: ^ seem like RemoteRepository raises Repository.ONF instead of StoreONF
-        hash = b""
-        logger.debug("cache/chunks_hash missing!")
-    return hash
+def try_upgrade_to_b14(repository):
+    # TODO: remove this before 2.0.0 release
+    # we just delete any present chunk index cache here, it is invalid due to the
+    # refcount -> flags change we did and due to the different CHUNKINDEX_HASH_SEED.
+    for name in "chunks_hash", "chunks":
+        try:
+            repository.store_delete(f"cache/{name}")
+        except (Repository.ObjectNotFound, StoreObjectNotFound):
+            pass  # likely already upgraded
+
+
+def list_chunkindex_hashes(repository):
+    hashes = []
+    for info in repository.store_list("cache"):
+        info = ItemInfo(*info)  # RPC does not give namedtuple
+        if info.name.startswith("chunks."):
+            hash = info.name.removeprefix("chunks.")
+            hashes.append(hash)
+    hashes = sorted(hashes)
+    logger.debug(f"cached chunk indexes: {hashes}")
+    return hashes
+
+
+def delete_chunkindex_cache(repository):
+    hashes = list_chunkindex_hashes(repository)
+    for hash in hashes:
+        cache_name = f"cache/chunks.{hash}"
+        try:
+            repository.store_delete(cache_name)
+        except (Repository.ObjectNotFound, StoreObjectNotFound):
+            # TODO: ^ seem like RemoteRepository raises Repository.ONF instead of StoreONF
+            pass
+    logger.debug(f"cached chunk indexes deleted: {hashes}")
 
 
 CHUNKINDEX_HASH_SEED = 2
 
 
-def write_chunkindex_to_repo_cache(repository, chunks, *, clear=False, force_write=False):
-    cached_hash = load_chunks_hash(repository)
+def write_chunkindex_to_repo_cache(
+    repository, chunks, *, clear=False, force_write=False, delete_other=False, delete_these=None
+):
+    cached_hashes = list_chunkindex_hashes(repository)
     with io.BytesIO() as f:
         chunks.write(f)
         data = f.getvalue()
     if clear:
         # if we don't need the in-memory chunks index anymore:
         chunks.clear()  # free memory, immediately
-    new_hash = xxh64(data, seed=CHUNKINDEX_HASH_SEED)
-    if force_write or new_hash != cached_hash:
-        # when an updated chunks index is stored into the cache, we also store its hash into the cache.
+    new_hash = bin_to_hex(xxh64(data, seed=CHUNKINDEX_HASH_SEED))
+    if force_write or new_hash not in cached_hashes:
+        # when an updated chunks index is stored into the cache, we also store its hash as part of the name.
         # when a client is loading the chunks index from a cache, it has to compare its xxh64
-        # hash against cache/chunks_hash in the repository. if it is the same, the cache
-        # is valid. If it is different, the cache is either corrupted or out of date and
-        # has to be discarded.
-        # when some functionality is DELETING chunks from the repository, it has to either update
-        # both cache/chunks and cache/chunks_hash (like borg compact does) or it has to delete both,
+        # hash against the hash in its name. if it is the same, the cache is valid.
+        # if it is different, the cache is either corrupted or out of date and has to be discarded.
+        # when some functionality is DELETING chunks from the repository, it has to delete
+        # all existing cache/chunks.* and maybe write a new, valid cache/chunks.<hash>,
         # so that all clients will discard any client-local chunks index caches.
-        logger.debug(f"caching chunks index {bin_to_hex(new_hash)} in repository...")
-        repository.store_store("cache/chunks", data)
-        repository.store_store("cache/chunks_hash", new_hash)
+        cache_name = f"cache/chunks.{new_hash}"
+        logger.debug(f"caching chunks index as {cache_name} in repository...")
+        repository.store_store(cache_name, data)
+        if delete_other:
+            delete_these = cached_hashes
+        elif delete_these:
+            pass
+        else:
+            delete_these = []
+        for hash in delete_these:
+            cache_name = f"cache/chunks.{hash}"
+            try:
+                repository.store_delete(cache_name)
+            except (Repository.ObjectNotFound, StoreObjectNotFound):
+                # TODO: ^ seem like RemoteRepository raises Repository.ONF instead of StoreONF
+                pass
+        if delete_these:
+            logger.debug(f"cached chunk indexes deleted: {delete_these}")
     return new_hash
 
 
+def read_chunkindex_from_repo_cache(repository, hash):
+    cache_name = f"cache/chunks.{hash}"
+    logger.debug(f"trying to load {cache_name} from the repo...")
+    try:
+        chunks_data = repository.store_load(cache_name)
+    except (Repository.ObjectNotFound, StoreObjectNotFound):
+        # TODO: ^ seem like RemoteRepository raises Repository.ONF instead of StoreONF
+        logger.debug(f"{cache_name} not found in the repository.")
+    else:
+        if xxh64(chunks_data, seed=CHUNKINDEX_HASH_SEED) == hex_to_bin(hash):
+            logger.debug(f"{cache_name} is valid.")
+            with io.BytesIO(chunks_data) as f:
+                chunks = ChunkIndex.read(f)
+            return chunks
+        else:
+            logger.debug(f"{cache_name} is invalid.")
+
+
 def build_chunkindex_from_repo(repository, *, disable_caches=False, cache_immediately=False):
-    chunks = None
-    # first, try to load a pre-computed and centrally cached chunks index:
+    try_upgrade_to_b14(repository)
+    # first, try to build a fresh, mostly complete chunk index from centrally cached chunk indexes:
     if not disable_caches:
-        wanted_hash = load_chunks_hash(repository)
-        logger.debug(f"trying to get cached chunk index (id {bin_to_hex(wanted_hash)}) from the repo...")
-        try:
-            chunks_data = repository.store_load("cache/chunks")
-        except (Repository.ObjectNotFound, StoreObjectNotFound):
-            # TODO: ^ seem like RemoteRepository raises Repository.ONF instead of StoreONF
-            logger.debug("cache/chunks not found in the repository.")
-        else:
-            if xxh64(chunks_data, seed=CHUNKINDEX_HASH_SEED) == wanted_hash:
-                logger.debug("cache/chunks is valid.")
-                with io.BytesIO(chunks_data) as f:
-                    chunks = ChunkIndex.read(f)
+        hashes = list_chunkindex_hashes(repository)
+        if hashes:  # we have at least one cached chunk index!
+            merged = 0
+            chunks = ChunkIndex()  # we'll merge all we find into this
+            for hash in hashes:
+                chunks_to_merge = read_chunkindex_from_repo_cache(repository, hash)
+                if chunks_to_merge is not None:
+                    logger.debug(f"cached chunk index {hash} gets merged...")
+                    for k, v in chunks_to_merge.items():
+                        chunks[k] = v
+                    merged += 1
+                    chunks_to_merge.clear()
+            if merged > 0:
+                if merged > 1 and cache_immediately:
+                    # immediately update cache/chunks, so we don't have to merge these again:
+                    write_chunkindex_to_repo_cache(
+                        repository, chunks, clear=False, force_write=True, delete_these=hashes
+                    )
                 return chunks
-            else:
-                logger.debug("cache/chunks is invalid.")
     # if we didn't get anything from the cache, compute the ChunkIndex the slow way:
     logger.debug("querying the chunk IDs list from the repo...")
     chunks = ChunkIndex()
@@ -741,7 +805,7 @@ def build_chunkindex_from_repo(repository, *, disable_caches=False, cache_immedi
     logger.debug(f"queried {num_chunks} chunk IDs in {duration} s, ~{speed}/s")
     if cache_immediately:
         # immediately update cache/chunks, so we only rarely have to do it the slow way:
-        write_chunkindex_to_repo_cache(repository, chunks, clear=False, force_write=True)
+        write_chunkindex_to_repo_cache(repository, chunks, clear=False, force_write=True, delete_other=True)
     return chunks
 
 

+ 1 - 1
src/borg/repository.py

@@ -385,7 +385,7 @@ class Repository:
                     # if we did a full pass in one go, we built a complete, uptodate ChunkIndex, cache it!
                     from .cache import write_chunkindex_to_repo_cache
 
-                    write_chunkindex_to_repo_cache(self, chunks, clear=True, force_write=True)
+                    write_chunkindex_to_repo_cache(self, chunks, clear=True, force_write=True, delete_other=True)
         except StoreObjectNotFound:
             # it can be that there is no "data/" at all, then it crashes when iterating infos.
             pass