浏览代码

Merge pull request #1484 from enkore/issue/1396

Track shadowing of log entries
TW 8 年之前
父节点
当前提交
32e84f33e9
共有 2 个文件被更改,包括 92 次插入5 次删除
  1. 42 5
      src/borg/repository.py
  2. 50 0
      src/borg/testsuite/repository.py

+ 42 - 5
src/borg/repository.py

@@ -22,6 +22,7 @@ from .helpers import Location
 from .helpers import ProgressIndicatorPercent
 from .helpers import bin_to_hex
 from .locking import Lock, LockError, LockErrorT
+from .logger import create_logger
 from .lrucache import LRUCache
 from .platform import SaveFile, SyncFile, sync_dir
 
@@ -110,6 +111,10 @@ class Repository:
         self.io = None
         self.lock = None
         self.index = None
+        # This is an index of shadowed log entries during this transaction. Consider the following sequence:
+        # segment_n PUT A, segment_x DELETE A
+        # After the "DELETE A" in segment_x the shadow index will contain "A -> [n]".
+        self.shadow_index = {}
         self._active_txn = False
         self.lock_wait = lock_wait
         self.do_lock = lock
@@ -308,6 +313,7 @@ class Repository:
         if transaction_id is None:
             self.segments = {}  # XXX bad name: usage_count_of_segment_x = self.segments[x]
             self.compact = FreeSpace()  # XXX bad name: freeable_space_of_segment_x = self.compact[x]
+            self.shadow_index.clear()
         else:
             if do_cleanup:
                 self.io.cleanup(transaction_id)
@@ -338,6 +344,11 @@ class Repository:
             else:
                 self.segments = hints[b'segments']
                 self.compact = FreeSpace(hints[b'compact'])
+            # Drop uncommitted segments in the shadow index
+            for key, shadowed_segments in self.shadow_index.items():
+                for segment in list(shadowed_segments):
+                    if segment > transaction_id:
+                        shadowed_segments.remove(segment)
 
     def write_index(self):
         hints = {b'version': 2,
@@ -413,31 +424,40 @@ class Repository:
         index_transaction_id = self.get_index_transaction_id()
         segments = self.segments
         unused = []  # list of segments, that are not used anymore
+        logger = create_logger('borg.debug.compact_segments')
 
         def complete_xfer(intermediate=True):
             # complete the current transfer (when some target segment is full)
             nonlocal unused
             # commit the new, compact, used segments
-            self.io.write_commit(intermediate=intermediate)
+            segment = self.io.write_commit(intermediate=intermediate)
+            logger.debug('complete_xfer: wrote %scommit at segment %d', 'intermediate ' if intermediate else '', segment)
             # get rid of the old, sparse, unused segments. free space.
             for segment in unused:
+                logger.debug('complete_xfer: deleting unused segment %d', segment)
                 assert self.segments.pop(segment) == 0
                 self.io.delete_segment(segment)
                 del self.compact[segment]
             unused = []
 
+        logger.debug('compaction started.')
         for segment, freeable_space in sorted(self.compact.items()):
             if not self.io.segment_exists(segment):
+                logger.warning('segment %d not found, but listed in compaction data', segment)
                 del self.compact[segment]
                 continue
             segment_size = self.io.segment_size(segment)
             if segment_size > 0.2 * self.max_segment_size and freeable_space < 0.15 * segment_size:
-                logger.debug('not compacting segment %d for later (only %d bytes are sparse)',
-                             segment, freeable_space)
+                logger.debug('not compacting segment %d (only %d bytes are sparse)', segment, freeable_space)
                 continue
             segments.setdefault(segment, 0)
+            logger.debug('compacting segment %d with usage count %d and %d freeable bytes',
+                         segment, segments[segment], freeable_space)
             for tag, key, offset, data in self.io.iter_objects(segment, include_data=True):
-                if tag == TAG_PUT and self.index.get(key, (-1, -1)) == (segment, offset):
+                if tag == TAG_COMMIT:
+                    continue
+                in_index = self.index.get(key) == (segment, offset)
+                if tag == TAG_PUT and in_index:
                     try:
                         new_segment, offset = self.io.write_put(key, data, raise_full=True)
                     except LoggedIO.SegmentFull:
@@ -447,8 +467,22 @@ class Repository:
                     segments.setdefault(new_segment, 0)
                     segments[new_segment] += 1
                     segments[segment] -= 1
+                elif tag == TAG_PUT and not in_index:
+                    # If this is a PUT shadowed by a later tag, then it will be gone when this segment is deleted after
+                    # this loop. Therefore it is removed from the shadow index.
+                    try:
+                        self.shadow_index[key].remove(segment)
+                    except (KeyError, ValueError):
+                        pass
                 elif tag == TAG_DELETE:
-                    if index_transaction_id is None or segment > index_transaction_id:
+                    # If the shadow index doesn't contain this key, then we can't say if there's a shadowed older tag,
+                    # therefore we do not drop the delete, but write it to a current segment.
+                    shadowed_put_exists = key not in self.shadow_index or any(
+                        # If the key is in the shadow index and there is any segment with an older PUT of this
+                        # key, we have a shadowed put.
+                        shadowed < segment for shadowed in self.shadow_index[key])
+
+                    if shadowed_put_exists or index_transaction_id is None or segment > index_transaction_id:
                         # (introduced in 6425d16aa84be1eaaf88)
                         # This is needed to avoid object un-deletion if we crash between the commit and the deletion
                         # of old segments in complete_xfer().
@@ -492,6 +526,7 @@ class Repository:
             assert segments[segment] == 0
             unused.append(segment)
         complete_xfer(intermediate=False)
+        logger.debug('compaction completed.')
 
     def replay_segments(self, index_transaction_id, segments_transaction_id):
         # fake an old client, so that in case we do not have an exclusive lock yet, prepare_txn will upgrade the lock:
@@ -705,6 +740,7 @@ class Repository:
             segment, offset = self.index.pop(id)
         except KeyError:
             raise self.ObjectNotFound(id, self.path) from None
+        self.shadow_index.setdefault(id, []).append(segment)
         self.segments[segment] -= 1
         size = self.io.read(segment, offset, id, read_data=False)
         self.compact[segment] += size
@@ -1017,6 +1053,7 @@ class LoggedIO:
         crc = self.crc_fmt.pack(crc32(header) & 0xffffffff)
         fd.write(b''.join((crc, header)))
         self.close_segment()
+        return self.segment - 1  # close_segment() increments it
 
 
 MAX_DATA_SIZE = MAX_OBJECT_SIZE - LoggedIO.put_header_fmt.size

+ 50 - 0
src/borg/testsuite/repository.py

@@ -295,6 +295,56 @@ class RepositoryCommitTestCase(RepositoryTestCaseBase):
             for tag, key, offset, size in self.repository.io.iter_objects(segment):
                 assert tag != TAG_DELETE
 
+    def test_shadowed_entries_are_preserved(self):
+        get_latest_segment = self.repository.io.get_latest_segment
+        self.repository.put(H(1), b'1')
+        # This is the segment with our original PUT of interest
+        put_segment = get_latest_segment()
+        self.repository.commit()
+
+        # We now delete H(1), and force this segment to not be compacted, which can happen
+        # if it's not sparse enough (symbolized by H(2) here).
+        self.repository.delete(H(1))
+        self.repository.put(H(2), b'1')
+        delete_segment = get_latest_segment()
+
+        # We pretend these are mostly dense (not sparse) and won't be compacted
+        del self.repository.compact[put_segment]
+        del self.repository.compact[delete_segment]
+
+        self.repository.commit()
+
+        # Now we perform an unrelated operation on the segment containing the DELETE,
+        # causing it to be compacted.
+        self.repository.delete(H(2))
+        self.repository.commit()
+
+        assert self.repository.io.segment_exists(put_segment)
+        assert not self.repository.io.segment_exists(delete_segment)
+
+        # Basic case, since the index survived this must be ok
+        assert H(1) not in self.repository
+        # Nuke index, force replay
+        os.unlink(os.path.join(self.repository.path, 'index.%d' % get_latest_segment()))
+        # Must not reappear
+        assert H(1) not in self.repository
+
+    def test_shadow_index_rollback(self):
+        self.repository.put(H(1), b'1')
+        self.repository.delete(H(1))
+        assert self.repository.shadow_index[H(1)] == [0]
+        self.repository.commit()
+        # note how an empty list means that nothing is shadowed for sure
+        assert self.repository.shadow_index[H(1)] == []
+        self.repository.put(H(1), b'1')
+        self.repository.delete(H(1))
+        # 0 put/delete; 1 commit; 2 compacted; 3 commit; 4 put/delete
+        assert self.repository.shadow_index[H(1)] == [4]
+        self.repository.rollback()
+        self.repository.put(H(2), b'1')
+        # After the rollback segment 4 shouldn't be considered anymore
+        assert self.repository.shadow_index[H(1)] == []
+
 
 class RepositoryAppendOnlyTestCase(RepositoryTestCaseBase):
     def open(self, create=False):