Browse Source

cache/hashindex: remove decref method, don't try to remove chunks on exceptions

When the AdhocCache(WithFiles) queries chunk IDs from the repo to build the chunks
index, it won't know their refcount and thus all chunks in the index have their
refcount at the MAX_VALUE (representing "infinite") and that would never decrease
nor could that ever reach zero and get the chunk deleted from the repo.

Only completely new chunks first written in the current borg run have a valid
refcount.

In some exception handlers, borg tried to clean up chunks that won't be used
by an item by decref'ing them. That is either:
- pointless due to refcount being at MAX_VALUE
- inefficient, because the user might retry the backup and would need to
  transmit these chunks to the repo again.

We'll just rely on borg compact ONLY to clean up any unused/orphan chunks.
Thomas Waldmann 9 tháng trước cách đây
mục cha
commit
ef47666627

+ 86 - 114
src/borg/archive.py

@@ -952,7 +952,6 @@ Duration: {0.duration}
         new_id = self.key.id_hash(data)
         new_id = self.key.id_hash(data)
         self.cache.add_chunk(new_id, {}, data, stats=self.stats, ro_type=ROBJ_ARCHIVE_META)
         self.cache.add_chunk(new_id, {}, data, stats=self.stats, ro_type=ROBJ_ARCHIVE_META)
         self.manifest.archives[self.name] = (new_id, metadata.time)
         self.manifest.archives[self.name] = (new_id, metadata.time)
-        self.cache.chunk_decref(self.id, 1, self.stats)
         self.id = new_id
         self.id = new_id
 
 
     def rename(self, name):
     def rename(self, name):
@@ -1309,20 +1308,11 @@ class FilesystemObjectProcessors:
             item.uid = uid
             item.uid = uid
         if gid is not None:
         if gid is not None:
             item.gid = gid
             item.gid = gid
-        try:
-            self.process_file_chunks(
-                item, cache, self.stats, self.show_progress, backup_io_iter(self.chunker.chunkify(fd))
-            )
-        except BackupOSError:
-            # see comments in process_file's exception handler, same issue here.
-            for chunk in item.get("chunks", []):
-                cache.chunk_decref(chunk.id, chunk.size, self.stats, wait=False)
-            raise
-        else:
-            item.get_size(memorize=True)
-            self.stats.nfiles += 1
-            self.add_item(item, stats=self.stats)
-            return status
+        self.process_file_chunks(item, cache, self.stats, self.show_progress, backup_io_iter(self.chunker.chunkify(fd)))
+        item.get_size(memorize=True)
+        self.stats.nfiles += 1
+        self.add_item(item, stats=self.stats)
+        return status
 
 
     def process_file(self, *, path, parent_fd, name, st, cache, flags=flags_normal, last_try=False, strip_prefix):
     def process_file(self, *, path, parent_fd, name, st, cache, flags=flags_normal, last_try=False, strip_prefix):
         with self.create_helper(path, st, None, strip_prefix=strip_prefix) as (
         with self.create_helper(path, st, None, strip_prefix=strip_prefix) as (
@@ -1343,93 +1333,81 @@ class FilesystemObjectProcessors:
                     # so it can be extracted / accessed in FUSE mount like a regular file.
                     # so it can be extracted / accessed in FUSE mount like a regular file.
                     # this needs to be done early, so that part files also get the patched mode.
                     # this needs to be done early, so that part files also get the patched mode.
                     item.mode = stat.S_IFREG | stat.S_IMODE(item.mode)
                     item.mode = stat.S_IFREG | stat.S_IMODE(item.mode)
-                # we begin processing chunks now (writing or incref'ing them to the repository),
-                # which might require cleanup (see except-branch):
-                try:
-                    if hl_chunks is not None:  # create_helper gave us chunks from a previous hardlink
-                        item.chunks = []
-                        for chunk_id, chunk_size in hl_chunks:
-                            # process one-by-one, so we will know in item.chunks how far we got
-                            chunk_entry = cache.chunk_incref(chunk_id, chunk_size, self.stats)
-                            item.chunks.append(chunk_entry)
-                    else:  # normal case, no "2nd+" hardlink
-                        if not is_special_file:
-                            hashed_path = safe_encode(os.path.join(self.cwd, path))
-                            started_hashing = time.monotonic()
-                            path_hash = self.key.id_hash(hashed_path)
-                            self.stats.hashing_time += time.monotonic() - started_hashing
-                            known, chunks = cache.file_known_and_unchanged(hashed_path, path_hash, st)
+                # we begin processing chunks now.
+                if hl_chunks is not None:  # create_helper gave us chunks from a previous hardlink
+                    item.chunks = []
+                    for chunk_id, chunk_size in hl_chunks:
+                        # process one-by-one, so we will know in item.chunks how far we got
+                        chunk_entry = cache.chunk_incref(chunk_id, chunk_size, self.stats)
+                        item.chunks.append(chunk_entry)
+                else:  # normal case, no "2nd+" hardlink
+                    if not is_special_file:
+                        hashed_path = safe_encode(os.path.join(self.cwd, path))
+                        started_hashing = time.monotonic()
+                        path_hash = self.key.id_hash(hashed_path)
+                        self.stats.hashing_time += time.monotonic() - started_hashing
+                        known, chunks = cache.file_known_and_unchanged(hashed_path, path_hash, st)
+                    else:
+                        # in --read-special mode, we may be called for special files.
+                        # there should be no information in the cache about special files processed in
+                        # read-special mode, but we better play safe as this was wrong in the past:
+                        hashed_path = path_hash = None
+                        known, chunks = False, None
+                    if chunks is not None:
+                        # Make sure all ids are available
+                        for chunk in chunks:
+                            if not cache.seen_chunk(chunk.id):
+                                # cache said it is unmodified, but we lost a chunk: process file like modified
+                                status = "M"
+                                break
                         else:
                         else:
-                            # in --read-special mode, we may be called for special files.
-                            # there should be no information in the cache about special files processed in
-                            # read-special mode, but we better play safe as this was wrong in the past:
-                            hashed_path = path_hash = None
-                            known, chunks = False, None
-                        if chunks is not None:
-                            # Make sure all ids are available
+                            item.chunks = []
                             for chunk in chunks:
                             for chunk in chunks:
-                                if not cache.seen_chunk(chunk.id):
-                                    # cache said it is unmodified, but we lost a chunk: process file like modified
-                                    status = "M"
-                                    break
+                                # process one-by-one, so we will know in item.chunks how far we got
+                                cache.chunk_incref(chunk.id, chunk.size, self.stats)
+                                item.chunks.append(chunk)
+                            status = "U"  # regular file, unchanged
+                    else:
+                        status = "M" if known else "A"  # regular file, modified or added
+                    self.print_file_status(status, path)
+                    # Only chunkify the file if needed
+                    changed_while_backup = False
+                    if "chunks" not in item:
+                        with backup_io("read"):
+                            self.process_file_chunks(
+                                item,
+                                cache,
+                                self.stats,
+                                self.show_progress,
+                                backup_io_iter(self.chunker.chunkify(None, fd)),
+                            )
+                            self.stats.chunking_time = self.chunker.chunking_time
+                        if not is_win32:  # TODO for win32
+                            with backup_io("fstat2"):
+                                st2 = os.fstat(fd)
+                            # special files:
+                            # - fifos change naturally, because they are fed from the other side. no problem.
+                            # - blk/chr devices don't change ctime anyway.
+                            changed_while_backup = not is_special_file and st.st_ctime_ns != st2.st_ctime_ns
+                        if changed_while_backup:
+                            # regular file changed while we backed it up, might be inconsistent/corrupt!
+                            if last_try:
+                                status = "C"  # crap! retries did not help.
                             else:
                             else:
-                                item.chunks = []
-                                for chunk in chunks:
-                                    # process one-by-one, so we will know in item.chunks how far we got
-                                    cache.chunk_incref(chunk.id, chunk.size, self.stats)
-                                    item.chunks.append(chunk)
-                                status = "U"  # regular file, unchanged
-                        else:
-                            status = "M" if known else "A"  # regular file, modified or added
-                        self.print_file_status(status, path)
-                        # Only chunkify the file if needed
-                        changed_while_backup = False
-                        if "chunks" not in item:
-                            with backup_io("read"):
-                                self.process_file_chunks(
-                                    item,
-                                    cache,
-                                    self.stats,
-                                    self.show_progress,
-                                    backup_io_iter(self.chunker.chunkify(None, fd)),
-                                )
-                                self.stats.chunking_time = self.chunker.chunking_time
-                            if not is_win32:  # TODO for win32
-                                with backup_io("fstat2"):
-                                    st2 = os.fstat(fd)
-                                # special files:
-                                # - fifos change naturally, because they are fed from the other side. no problem.
-                                # - blk/chr devices don't change ctime anyway.
-                                changed_while_backup = not is_special_file and st.st_ctime_ns != st2.st_ctime_ns
-                            if changed_while_backup:
-                                # regular file changed while we backed it up, might be inconsistent/corrupt!
-                                if last_try:
-                                    status = "C"  # crap! retries did not help.
-                                else:
-                                    raise BackupError("file changed while we read it!")
-                            if not is_special_file and not changed_while_backup:
-                                # we must not memorize special files, because the contents of e.g. a
-                                # block or char device will change without its mtime/size/inode changing.
-                                # also, we must not memorize a potentially inconsistent/corrupt file that
-                                # changed while we backed it up.
-                                cache.memorize_file(hashed_path, path_hash, st, item.chunks)
-                        self.stats.files_stats[status] += 1  # must be done late
-                        if not changed_while_backup:
-                            status = None  # we already called print_file_status
-                    self.stats.nfiles += 1
-                    item.update(self.metadata_collector.stat_ext_attrs(st, path, fd=fd))
-                    item.get_size(memorize=True)
-                    return status
-                except BackupOSError:
-                    # Something went wrong and we might need to clean up a bit.
-                    # Maybe we have already incref'ed some file content chunks in the repo -
-                    # but we will not add an item (see add_item in create_helper) and thus
-                    # they would be orphaned chunks in case that we commit the transaction.
-                    for chunk in item.get("chunks", []):
-                        cache.chunk_decref(chunk.id, chunk.size, self.stats, wait=False)
-                    # Now that we have cleaned up the chunk references, we can re-raise the exception.
-                    # This will skip processing of this file, but might retry or continue with the next one.
-                    raise
+                                raise BackupError("file changed while we read it!")
+                        if not is_special_file and not changed_while_backup:
+                            # we must not memorize special files, because the contents of e.g. a
+                            # block or char device will change without its mtime/size/inode changing.
+                            # also, we must not memorize a potentially inconsistent/corrupt file that
+                            # changed while we backed it up.
+                            cache.memorize_file(hashed_path, path_hash, st, item.chunks)
+                    self.stats.files_stats[status] += 1  # must be done late
+                    if not changed_while_backup:
+                        status = None  # we already called print_file_status
+                self.stats.nfiles += 1
+                item.update(self.metadata_collector.stat_ext_attrs(st, path, fd=fd))
+                item.get_size(memorize=True)
+                return status
 
 
 
 
 class TarfileObjectProcessors:
 class TarfileObjectProcessors:
@@ -1524,21 +1502,15 @@ class TarfileObjectProcessors:
         with self.create_helper(tarinfo, status, type) as (item, status):
         with self.create_helper(tarinfo, status, type) as (item, status):
             self.print_file_status(status, tarinfo.name)
             self.print_file_status(status, tarinfo.name)
             status = None  # we already printed the status
             status = None  # we already printed the status
-            try:
-                fd = tar.extractfile(tarinfo)
-                self.process_file_chunks(
-                    item, self.cache, self.stats, self.show_progress, backup_io_iter(self.chunker.chunkify(fd))
-                )
-                item.get_size(memorize=True, from_chunks=True)
-                self.stats.nfiles += 1
-                # we need to remember ALL files, see HardLinkManager.__doc__
-                self.hlm.remember(id=tarinfo.name, info=item.chunks)
-                return status
-            except BackupOSError:
-                # see comment in FilesystemObjectProcessors.process_file, same issue here.
-                for chunk in item.get("chunks", []):
-                    self.cache.chunk_decref(chunk.id, chunk.size, self.stats, wait=False)
-                raise
+            fd = tar.extractfile(tarinfo)
+            self.process_file_chunks(
+                item, self.cache, self.stats, self.show_progress, backup_io_iter(self.chunker.chunkify(fd))
+            )
+            item.get_size(memorize=True, from_chunks=True)
+            self.stats.nfiles += 1
+            # we need to remember ALL files, see HardLinkManager.__doc__
+            self.hlm.remember(id=tarinfo.name, info=item.chunks)
+            return status
 
 
 
 
 def valid_msgpacked_dict(d, keys_serialized):
 def valid_msgpacked_dict(d, keys_serialized):

+ 0 - 10
src/borg/cache.py

@@ -585,16 +585,6 @@ class ChunksMixin:
         stats.update(size, False)
         stats.update(size, False)
         return ChunkListEntry(id, size)
         return ChunkListEntry(id, size)
 
 
-    def chunk_decref(self, id, size, stats, wait=True):
-        assert isinstance(size, int) and size > 0
-        count, _size = self.chunks.decref(id)
-        if count == 0:
-            del self.chunks[id]
-            self.repository.delete(id, wait=wait)
-            stats.update(-size, True)
-        else:
-            stats.update(-size, False)
-
     def seen_chunk(self, id, size=None):
     def seen_chunk(self, id, size=None):
         entry = self.chunks.get(id, ChunkIndexEntry(0, None))
         entry = self.chunks.get(id, ChunkIndexEntry(0, None))
         if entry.refcount and size is not None:
         if entry.refcount and size is not None:

+ 0 - 1
src/borg/hashindex.pyi

@@ -38,7 +38,6 @@ class ChunkKeyIterator:
 
 
 class ChunkIndex(IndexBase):
 class ChunkIndex(IndexBase):
     def add(self, key: bytes, refs: int, size: int) -> None: ...
     def add(self, key: bytes, refs: int, size: int) -> None: ...
-    def decref(self, key: bytes) -> CIE: ...
     def incref(self, key: bytes) -> CIE: ...
     def incref(self, key: bytes) -> CIE: ...
     def iteritems(self, marker: bytes = ...) -> Iterator: ...
     def iteritems(self, marker: bytes = ...) -> Iterator: ...
     def merge(self, other_index) -> None: ...
     def merge(self, other_index) -> None: ...

+ 0 - 14
src/borg/hashindex.pyx

@@ -407,20 +407,6 @@ cdef class ChunkIndex(IndexBase):
         data[0] = _htole32(refcount)
         data[0] = _htole32(refcount)
         return refcount, _le32toh(data[1])
         return refcount, _le32toh(data[1])
 
 
-    def decref(self, key):
-        """Decrease refcount for 'key', return (refcount, size)"""
-        assert len(key) == self.key_size
-        data = <uint32_t *>hashindex_get(self.index, <unsigned char *>key)
-        if not data:
-            raise KeyError(key)
-        cdef uint32_t refcount = _le32toh(data[0])
-        # Never decrease a reference count of zero
-        assert 0 < refcount <= _MAX_VALUE, "invalid reference count"
-        if refcount != _MAX_VALUE:
-            refcount -= 1
-        data[0] = _htole32(refcount)
-        return refcount, _le32toh(data[1])
-
     def iteritems(self, marker=None):
     def iteritems(self, marker=None):
         cdef const unsigned char *key
         cdef const unsigned char *key
         iter = ChunkKeyIterator(self.key_size)
         iter = ChunkKeyIterator(self.key_size)

+ 1 - 1
src/borg/selftest.py

@@ -33,7 +33,7 @@ SELFTEST_CASES = [
     ChunkerTestCase,
     ChunkerTestCase,
 ]
 ]
 
 
-SELFTEST_COUNT = 30
+SELFTEST_COUNT = 28
 
 
 
 
 class SelfTestResult(TestResult):
 class SelfTestResult(TestResult):

+ 0 - 14
src/borg/testsuite/cache.py

@@ -7,7 +7,6 @@ from .key import TestKey
 from ..archive import Statistics
 from ..archive import Statistics
 from ..cache import AdHocCache
 from ..cache import AdHocCache
 from ..crypto.key import AESOCBRepoKey
 from ..crypto.key import AESOCBRepoKey
-from ..hashindex import ChunkIndex
 from ..manifest import Manifest
 from ..manifest import Manifest
 from ..repository import Repository
 from ..repository import Repository
 
 
@@ -38,22 +37,9 @@ class TestAdHocCache:
     def test_does_not_contain_manifest(self, cache):
     def test_does_not_contain_manifest(self, cache):
         assert not cache.seen_chunk(Manifest.MANIFEST_ID)
         assert not cache.seen_chunk(Manifest.MANIFEST_ID)
 
 
-    def test_does_not_delete_existing_chunks(self, repository, cache):
-        assert cache.seen_chunk(H(1)) == ChunkIndex.MAX_VALUE
-        cache.chunk_decref(H(1), 1, Statistics())
-        assert repository.get(H(1)) == b"1234"
-
     def test_seen_chunk_add_chunk_size(self, cache):
     def test_seen_chunk_add_chunk_size(self, cache):
         assert cache.add_chunk(H(1), {}, b"5678", stats=Statistics()) == (H(1), 4)
         assert cache.add_chunk(H(1), {}, b"5678", stats=Statistics()) == (H(1), 4)
 
 
-    def test_deletes_chunks_during_lifetime(self, cache, repository):
-        cache.add_chunk(H(5), {}, b"1010", stats=Statistics())
-        assert cache.seen_chunk(H(5)) == 1
-        cache.chunk_decref(H(5), 1, Statistics())
-        assert not cache.seen_chunk(H(5))
-        with pytest.raises(Repository.ObjectNotFound):
-            repository.get(H(5))
-
     def test_files_cache(self, cache):
     def test_files_cache(self, cache):
         assert cache.file_known_and_unchanged(b"foo", bytes(32), None) == (False, None)
         assert cache.file_known_and_unchanged(b"foo", bytes(32), None) == (False, None)
         assert cache.cache_mode == "d"
         assert cache.cache_mode == "d"

+ 1 - 21
src/borg/testsuite/hashindex.py

@@ -198,9 +198,6 @@ class HashIndexRefcountingTestCase(BaseTestCase):
             # first incref to move it to the limit
             # first incref to move it to the limit
             refcount, *_ = idx.incref(H(1))
             refcount, *_ = idx.incref(H(1))
             assert refcount == ChunkIndex.MAX_VALUE
             assert refcount == ChunkIndex.MAX_VALUE
-        for i in range(5):
-            refcount, *_ = idx.decref(H(1))
-            assert refcount == ChunkIndex.MAX_VALUE
 
 
     def _merge(self, refcounta, refcountb):
     def _merge(self, refcounta, refcountb):
         def merge(refcount1, refcount2):
         def merge(refcount1, refcount2):
@@ -252,27 +249,12 @@ class HashIndexRefcountingTestCase(BaseTestCase):
         refcount, *_ = idx1[H(1)]
         refcount, *_ = idx1[H(1)]
         assert refcount == ChunkIndex.MAX_VALUE
         assert refcount == ChunkIndex.MAX_VALUE
 
 
-    def test_decref_limit(self):
-        idx1 = ChunkIndex()
-        idx1[H(1)] = ChunkIndex.MAX_VALUE, 6
-        idx1.decref(H(1))
-        refcount, *_ = idx1[H(1)]
-        assert refcount == ChunkIndex.MAX_VALUE
-
-    def test_decref_zero(self):
-        idx1 = ChunkIndex()
-        idx1[H(1)] = 0, 0
-        with self.assert_raises(AssertionError):
-            idx1.decref(H(1))
-
-    def test_incref_decref(self):
+    def test_incref(self):
         idx1 = ChunkIndex()
         idx1 = ChunkIndex()
         idx1.add(H(1), 5, 6)
         idx1.add(H(1), 5, 6)
         assert idx1[H(1)] == (5, 6)
         assert idx1[H(1)] == (5, 6)
         idx1.incref(H(1))
         idx1.incref(H(1))
         assert idx1[H(1)] == (6, 6)
         assert idx1[H(1)] == (6, 6)
-        idx1.decref(H(1))
-        assert idx1[H(1)] == (5, 6)
 
 
     def test_setitem_raises(self):
     def test_setitem_raises(self):
         idx1 = ChunkIndex()
         idx1 = ChunkIndex()
@@ -283,8 +265,6 @@ class HashIndexRefcountingTestCase(BaseTestCase):
         idx = ChunkIndex()
         idx = ChunkIndex()
         with self.assert_raises(KeyError):
         with self.assert_raises(KeyError):
             idx.incref(H(1))
             idx.incref(H(1))
-        with self.assert_raises(KeyError):
-            idx.decref(H(1))
         with self.assert_raises(KeyError):
         with self.assert_raises(KeyError):
             idx[H(1)]
             idx[H(1)]
         with self.assert_raises(OverflowError):
         with self.assert_raises(OverflowError):