Browse Source

Merge pull request #8541 from ThomasWaldmann/incremental-chunkindex-cache-updates

enable partial/incremental ChunkIndex cache updates
TW 6 months ago
parent
commit
b6ae924f30

+ 2 - 7
src/borg/archiver/compact_cmd.py

@@ -58,13 +58,8 @@ class ArchiveGarbageCollector:
         return chunks
 
     def save_chunk_index(self):
-        # first clean up:
-        for id, entry in self.chunks.iteritems():
-            # we already deleted the unused chunks, so everything left must be used:
-            assert entry.flags & ChunkIndex.F_USED
-            # as we put the wrong size in there, we need to clean up the size:
-            self.chunks[id] = entry._replace(size=0)
-        # now self.chunks is an uptodate ChunkIndex, usable for general borg usage!
+        # write_chunkindex_to_repo now removes all flags and size infos.
+        # we need this, as we put the wrong size in there.
         write_chunkindex_to_repo_cache(self.repository, self.chunks, clear=True, force_write=True, delete_other=True)
         self.chunks = None  # nothing there (cleared!)
 

+ 37 - 17
src/borg/cache.py

@@ -396,9 +396,7 @@ class FilesCacheMixin:
         assert isinstance(entry, FileCacheEntry)
         compressed_chunks = []
         for id, size in entry.chunks:
-            cie = self.chunks.get(id)
-            assert cie is not None
-            assert cie.flags & ChunkIndex.F_USED
+            cie = self.chunks[id]  # may raise KeyError if chunk id is not in repo
             if cie.size == 0:  # size is not known in the chunks index yet
                 self.chunks[id] = cie._replace(size=size)
             else:
@@ -418,9 +416,7 @@ class FilesCacheMixin:
         for idx in entry.chunks:
             assert isinstance(idx, int), f"{idx} is not an int"
             id = self.chunks.idx_to_k(idx)
-            cie = self.chunks.get(id)
-            assert cie is not None
-            assert cie.flags & ChunkIndex.F_USED
+            cie = self.chunks[id]
             assert cie.size > 0
             chunks.append((id, cie.size))
         entry = entry._replace(chunks=chunks)
@@ -485,6 +481,7 @@ class FilesCacheMixin:
                     mtime=int_to_timestamp(mtime_ns),
                     chunks=item.chunks,
                 )
+                # note: if the repo is an a valid state, next line should not fail with KeyError:
                 files[path_hash] = self.compress_entry(entry)
         # deal with special snapshot / timestamp granularity case, see FAQ:
         for path_hash in self._newest_path_hashes:
@@ -529,7 +526,11 @@ class FilesCacheMixin:
                         for path_hash, entry in u:
                             entry = FileCacheEntry(*entry)
                             entry = entry._replace(age=entry.age + 1)
-                            files[path_hash] = self.compress_entry(entry)
+                            try:
+                                files[path_hash] = self.compress_entry(entry)
+                            except KeyError:
+                                # repo is missing a chunk referenced from entry
+                                logger.debug(f"compress_entry failed for {entry}, skipping.")
                     except (TypeError, ValueError) as exc:
                         msg = "The files cache seems invalid. [%s]" % str(exc)
                         break
@@ -706,14 +707,23 @@ CHUNKINDEX_HASH_SEED = 2
 def write_chunkindex_to_repo_cache(
     repository, chunks, *, clear=False, force_write=False, delete_other=False, delete_these=None
 ):
-    cached_hashes = list_chunkindex_hashes(repository)
+    # the borghash code has no means to only serialize the F_NEW table entries,
+    # thus we copy only the new entries to a temporary table:
+    new_chunks = ChunkIndex()
+    # for now, we don't want to serialize the flags or the size, just the keys (chunk IDs):
+    cleaned_value = ChunkIndexEntry(flags=ChunkIndex.F_NONE, size=0)
+    for key, _ in chunks.iteritems(only_new=True):
+        new_chunks[key] = cleaned_value
     with io.BytesIO() as f:
-        chunks.write(f)
+        new_chunks.write(f)
         data = f.getvalue()
+    logger.debug(f"caching {len(new_chunks)} new chunks.")
+    new_chunks.clear()  # free memory of the temporary table
     if clear:
         # if we don't need the in-memory chunks index anymore:
         chunks.clear()  # free memory, immediately
     new_hash = bin_to_hex(xxh64(data, seed=CHUNKINDEX_HASH_SEED))
+    cached_hashes = list_chunkindex_hashes(repository)
     if force_write or new_hash not in cached_hashes:
         # when an updated chunks index is stored into the cache, we also store its hash as part of the name.
         # when a client is loading the chunks index from a cache, it has to compare its xxh64
@@ -725,12 +735,15 @@ def write_chunkindex_to_repo_cache(
         cache_name = f"cache/chunks.{new_hash}"
         logger.debug(f"caching chunks index as {cache_name} in repository...")
         repository.store_store(cache_name, data)
+        # we have successfully stored to the repository, so we can clear all F_NEW flags now:
+        chunks.clear_new()
+        # delete some not needed cached chunk indexes, but never the one we just wrote:
         if delete_other:
-            delete_these = cached_hashes
+            delete_these = set(cached_hashes) - {new_hash}
         elif delete_these:
-            pass
+            delete_these = set(delete_these) - {new_hash}
         else:
-            delete_these = []
+            delete_these = set()
         for hash in delete_these:
             cache_name = f"cache/chunks.{hash}"
             try:
@@ -783,6 +796,8 @@ def build_chunkindex_from_repo(repository, *, disable_caches=False, cache_immedi
                     write_chunkindex_to_repo_cache(
                         repository, chunks, clear=False, force_write=True, delete_these=hashes
                     )
+                else:
+                    chunks.clear_new()
                 return chunks
     # if we didn't get anything from the cache, compute the ChunkIndex the slow way:
     logger.debug("querying the chunk IDs list from the repo...")
@@ -818,6 +833,8 @@ class ChunksMixin:
         self._chunks = None
         self.last_refresh_dt = datetime.now(timezone.utc)
         self.refresh_td = timedelta(seconds=60)
+        self.chunks_cache_last_write = datetime.now(timezone.utc)
+        self.chunks_cache_write_td = timedelta(seconds=600)
 
     @property
     def chunks(self):
@@ -864,6 +881,7 @@ class ChunksMixin:
             else:
                 raise ValueError("when giving compressed data for a chunk, the uncompressed size must be given also")
         now = datetime.now(timezone.utc)
+        self._maybe_write_chunks_cache(now)
         exists = self.seen_chunk(id, size)
         if exists:
             # if borg create is processing lots of unchanged files (no content and not metadata changes),
@@ -879,10 +897,10 @@ class ChunksMixin:
         stats.update(size, not exists)
         return ChunkListEntry(id, size)
 
-    def _write_chunks_cache(self, chunks):
-        # this is called from .close, so we can clear here:
-        write_chunkindex_to_repo_cache(self.repository, self._chunks, clear=True)
-        self._chunks = None  # nothing there (cleared!)
+    def _maybe_write_chunks_cache(self, now, force=False, clear=False):
+        if force or now > self.chunks_cache_last_write + self.chunks_cache_write_td:
+            write_chunkindex_to_repo_cache(self.repository, self._chunks, clear=clear)
+            self.chunks_cache_last_write = now
 
     def refresh_lock(self, now):
         if now > self.last_refresh_dt + self.refresh_td:
@@ -980,7 +998,9 @@ class AdHocWithFilesCache(FilesCacheMixin, ChunksMixin):
             for key, value in sorted(self._chunks.stats.items()):
                 logger.debug(f"Chunks index stats: {key}: {value}")
             pi.output("Saving chunks cache")
-            self._write_chunks_cache(self._chunks)  # cache/chunks in repo has a different integrity mechanism
+            # note: cache/chunks.* in repo has a different integrity mechanism
+            self._maybe_write_chunks_cache(self._chunks, force=True, clear=True)
+            self._chunks = None  # nothing there (cleared!)
         pi.output("Saving cache config")
         self.cache_config.save(self.manifest)
         self.cache_config.close()

+ 5 - 1
src/borg/hashindex.pyi

@@ -13,8 +13,12 @@ CIE = Union[Tuple[int, int], Type[ChunkIndexEntry]]
 class ChunkIndex:
     F_NONE: int
     F_USED: int
+    F_NEW: int
+    M_USER: int
+    M_SYSTEM: int
     def add(self, key: bytes, size: int) -> None: ...
-    def iteritems(self, marker: bytes = ...) -> Iterator: ...
+    def iteritems(self, *, only_new: bool = ...) -> Iterator: ...
+    def clear_new(self) -> None: ...
     def __contains__(self, key: bytes) -> bool: ...
     def __getitem__(self, key: bytes) -> Type[ChunkIndexEntry]: ...
     def __setitem__(self, key: bytes, value: CIE) -> None: ...

+ 47 - 5
src/borg/hashindex.pyx

@@ -39,11 +39,16 @@ ChunkIndexEntry = namedtuple('ChunkIndexEntry', 'flags size')
 
 class ChunkIndex(HTProxyMixin, MutableMapping):
     """
-    Mapping from key256 to (refcount32, size32) to track chunks in the repository.
+    Mapping from key256 to (flags32, size32) to track chunks in the repository.
     """
-    # .flags values: 2^0 .. 2^31
+    # .flags related values:
     F_NONE = 0  # all flags cleared
-    F_USED = 1  # chunk is used/referenced
+    M_USER = 0x00ffffff  # mask for user flags
+    M_SYSTEM = 0xff000000  # mask for system flags
+    # user flags:
+    F_USED = 2 ** 0  # chunk is used/referenced
+    # system flags (internal use, always 0 to user, not changeable by user):
+    F_NEW = 2 ** 24  # a new chunk that is not present in repo/cache/chunks.* yet.
 
     def __init__(self, capacity=1000, path=None, usable=None):
         if path:
@@ -53,8 +58,15 @@ class ChunkIndex(HTProxyMixin, MutableMapping):
                 capacity = usable * 2  # load factor 0.5
             self.ht = HashTableNT(key_size=32, value_format="<II", value_type=ChunkIndexEntry, capacity=capacity)
 
-    def iteritems(self):
-        yield from self.ht.items()
+    def hide_system_flags(self, value):
+        user_flags = value.flags & self.M_USER
+        return value._replace(flags=user_flags)
+
+    def iteritems(self, *, only_new=False):
+        """iterate items (optionally only new items), hide system flags."""
+        for key, value in self.ht.items():
+            if not only_new or (value.flags & self.F_NEW):
+                yield key, self.hide_system_flags(value)
 
     def add(self, key, size):
         v = self.get(key)
@@ -65,6 +77,36 @@ class ChunkIndex(HTProxyMixin, MutableMapping):
             assert v.size == 0 or v.size == size
         self[key] = ChunkIndexEntry(flags=flags, size=size)
 
+    def __getitem__(self, key):
+        """specialized __getitem__ that hides system flags."""
+        value = self.ht[key]
+        return self.hide_system_flags(value)
+
+    def __setitem__(self, key, value):
+        """specialized __setitem__ that protects system flags, manages F_NEW flag."""
+        try:
+            prev = self.ht[key]
+        except KeyError:
+            prev_flags = self.F_NONE
+            is_new = True
+        else:
+            prev_flags = prev.flags
+            is_new = bool(prev_flags & self.F_NEW)  # was new? stays new!
+        system_flags = prev_flags & self.M_SYSTEM
+        if is_new:
+            system_flags |= self.F_NEW
+        else:
+            system_flags &= ~self.F_NEW
+        user_flags = value.flags & self.M_USER
+        self.ht[key] = value._replace(flags=system_flags | user_flags)
+
+    def clear_new(self):
+        """clear F_NEW flag of all items"""
+        for key, value in self.ht.items():
+            if value.flags & self.F_NEW:
+                flags = value.flags & ~self.F_NEW
+                self.ht[key] = value._replace(flags=flags)
+
     @classmethod
     def read(cls, path):
         return cls(path=path)

+ 20 - 1
src/borg/testsuite/hashindex_test.py

@@ -35,4 +35,23 @@ def test_keyerror():
     with pytest.raises(KeyError):
         chunks[x]
     with pytest.raises(struct.error):
-        chunks[x] = ChunkIndexEntry(flags=2**33, size=0)
+        chunks[x] = ChunkIndexEntry(flags=ChunkIndex.F_NONE, size=2**33)
+
+
+def test_new():
+    def new_chunks():
+        return list(chunks.iteritems(only_new=True))
+
+    chunks = ChunkIndex()
+    key1, value1a = H2(1), ChunkIndexEntry(flags=ChunkIndex.F_USED, size=23)
+    key2, value2a = H2(2), ChunkIndexEntry(flags=ChunkIndex.F_USED, size=42)
+    # tracking of new entries
+    assert new_chunks() == []
+    chunks[key1] = value1a
+    assert new_chunks() == [(key1, value1a)]
+    chunks.clear_new()
+    assert new_chunks() == []
+    chunks[key2] = value2a
+    assert new_chunks() == [(key2, value2a)]
+    chunks.clear_new()
+    assert new_chunks() == []