Procházet zdrojové kódy

Merge pull request #1448 from enkore/issue/1442

Fix untracked segments made by moved DELETEs
TW před 8 roky
rodič
revize
a12672530c
2 změnil soubory, kde provedl 66 přidání a 4 odebrání
  1. 42 3
      borg/repository.py
  2. 24 1
      borg/testsuite/repository.py

+ 42 - 3
borg/repository.py

@@ -292,6 +292,8 @@ class Repository:
                 self.io.delete_segment(segment)
                 self.io.delete_segment(segment)
             unused = []
             unused = []
 
 
+        # The first segment compaction creates, if any
+        first_new_segment = self.io.get_latest_segment() + 1
         for segment in sorted(self.compact):
         for segment in sorted(self.compact):
             if self.io.segment_exists(segment):
             if self.io.segment_exists(segment):
                 for tag, key, offset, data in self.io.iter_objects(segment, include_data=True):
                 for tag, key, offset, data in self.io.iter_objects(segment, include_data=True):
@@ -307,15 +309,52 @@ class Repository:
                         segments[segment] -= 1
                         segments[segment] -= 1
                     elif tag == TAG_DELETE:
                     elif tag == TAG_DELETE:
                         if index_transaction_id is None or segment > index_transaction_id:
                         if index_transaction_id is None or segment > index_transaction_id:
+                            # (introduced in 6425d16aa84be1eaaf88)
+                            # This is needed to avoid object un-deletion if we crash between the commit and the deletion
+                            # of old segments in complete_xfer().
+                            #
+                            # However, this only happens if the crash also affects the FS to the effect that file deletions
+                            # did not materialize consistently after journal recovery. If they always materialize in-order
+                            # then this is not a problem, because the old segment containing a deleted object would be deleted
+                            # before the segment containing the delete.
+                            #
+                            # Consider the following series of operations if we would not do this, ie. this entire if:
+                            # would be removed.
+                            # Columns are segments, lines are different keys (line 1 = some key, line 2 = some other key)
+                            # Legend: P=TAG_PUT, D=TAG_DELETE, c=commit, i=index is written for latest commit
+                            #
+                            # Segment | 1     | 2   | 3
+                            # --------+-------+-----+------
+                            # Key 1   | P     | D   |
+                            # Key 2   | P     |     | P
+                            # commits |   c i |   c |   c i
+                            # --------+-------+-----+------
+                            #                       ^- compact_segments starts
+                            #                           ^- complete_xfer commits, after that complete_xfer deletes
+                            #                              segments 1 and 2 (and then the index would be written).
+                            #
+                            # Now we crash. But only segment 2 gets deleted, while segment 1 is still around. Now key 1
+                            # is suddenly undeleted (because the delete in segment 2 is now missing).
+                            # Again, note the requirement here. We delete these in the correct order that this doesn't happen,
+                            # and only if the FS materialization of these deletes is reordered or parts dropped this can happen.
+                            # In this case it doesn't cause outright corruption, 'just' an index count mismatch, which will be
+                            # fixed by borg-check --repair.
+                            #
+                            # Note that in this check the index state is the proxy for a "most definitely settled" repository state,
+                            # ie. the assumption is that *all* operations on segments <= index state are completed and stable.
                             try:
                             try:
-                                self.io.write_delete(key, raise_full=save_space)
+                                new_segment = self.io.write_delete(key, raise_full=save_space)
                             except LoggedIO.SegmentFull:
                             except LoggedIO.SegmentFull:
                                 complete_xfer()
                                 complete_xfer()
-                                self.io.write_delete(key)
+                                new_segment = self.io.write_delete(key)
+                            self.compact.add(new_segment)
+                            self.segments.setdefault(new_segment, 0)
                 assert segments[segment] == 0
                 assert segments[segment] == 0
                 unused.append(segment)
                 unused.append(segment)
         complete_xfer()
         complete_xfer()
-        self.compact = set()
+        # Moving of deletes creates new sparse segments, only store these. All other segments
+        # are compact now.
+        self.compact = {segment for segment in self.compact if segment >= first_new_segment}
 
 
     def replay_segments(self, index_transaction_id, segments_transaction_id):
     def replay_segments(self, index_transaction_id, segments_transaction_id):
         # fake an old client, so that in case we do not have an exclusive lock yet, prepare_txn will upgrade the lock:
         # fake an old client, so that in case we do not have an exclusive lock yet, prepare_txn will upgrade the lock:

+ 24 - 1
borg/testsuite/repository.py

@@ -8,8 +8,9 @@ from ..hashindex import NSIndex
 from ..helpers import Location, IntegrityError
 from ..helpers import Location, IntegrityError
 from ..locking import Lock, LockFailed
 from ..locking import Lock, LockFailed
 from ..remote import RemoteRepository, InvalidRPCMethod
 from ..remote import RemoteRepository, InvalidRPCMethod
-from ..repository import Repository, LoggedIO, TAG_COMMIT, MAX_DATA_SIZE
+from ..repository import Repository, LoggedIO, TAG_DELETE, MAX_DATA_SIZE
 from . import BaseTestCase
 from . import BaseTestCase
+from .hashindex import H
 
 
 
 
 UNSPECIFIED = object()  # for default values where we can't use None
 UNSPECIFIED = object()  # for default values where we can't use None
@@ -227,6 +228,28 @@ class RepositoryCommitTestCase(RepositoryTestCaseBase):
             io = self.repository.io
             io = self.repository.io
             assert not io.is_committed_segment(io.get_latest_segment())
             assert not io.is_committed_segment(io.get_latest_segment())
 
 
+    def test_moved_deletes_are_tracked(self):
+        self.repository.put(H(1), b'1')
+        self.repository.put(H(2), b'2')
+        self.repository.commit()
+        self.repository.delete(H(1))
+        self.repository.commit()
+        last_segment = self.repository.io.get_latest_segment()
+        num_deletes = 0
+        for tag, key, offset, data in self.repository.io.iter_objects(last_segment, include_data=True):
+            if tag == TAG_DELETE:
+                assert key == H(1)
+                num_deletes += 1
+        assert num_deletes == 1
+        assert last_segment in self.repository.compact
+        self.repository.put(H(3), b'3')
+        self.repository.commit()
+        assert last_segment not in self.repository.compact
+        assert not self.repository.io.segment_exists(last_segment)
+        last_segment = self.repository.io.get_latest_segment()
+        for tag, key, offset in self.repository.io.iter_objects(last_segment):
+            assert tag != TAG_DELETE
+
 
 
 class RepositoryAppendOnlyTestCase(RepositoryTestCaseBase):
 class RepositoryAppendOnlyTestCase(RepositoryTestCaseBase):
     def open(self, create=False):
     def open(self, create=False):