瀏覽代碼

repository.scan: do not use chunkid as marker, but (segment, offset)

when using .scan(limit, marker), we used to use the last chunkid from
the previously returned scan result to remember how far we got and
from where we need to continue.

as this approach used the repo index to look up the respective segment/offset,
it was problematic if the code using scan was re-writing the chunk to
a new segment/offset, updating the repo index (e.g. when recompressing a chunk)
and basically destroying the memory about from where we need to continue
scanning.

thus, directly returning (segment, offset) as marker is easier and solves this issue.
Thomas Waldmann 2 年之前
父節點
當前提交
49a4884cfe
共有 4 個文件被更改,包括 40 次插入25 次删除
  1. 1 2
      src/borg/archive.py
  2. 8 8
      src/borg/archiver/debug_cmd.py
  3. 24 10
      src/borg/repository.py
  4. 7 5
      src/borg/testsuite/repository.py

+ 1 - 2
src/borg/archive.py

@@ -1749,11 +1749,10 @@ class ArchiveChecker:
         )
         marker = None
         while True:
-            chunk_ids = self.repository.scan(limit=100, marker=marker)
+            chunk_ids, marker = self.repository.scan(limit=100, marker=marker)
             if not chunk_ids:
                 break
             chunks_count_segments += len(chunk_ids)
-            marker = chunk_ids[-1]
             chunk_data_iter = self.repository.get_many(chunk_ids)
             chunk_ids_revd = list(reversed(chunk_ids))
             while chunk_ids_revd:

+ 8 - 8
src/borg/archiver/debug_cmd.py

@@ -155,11 +155,12 @@ class DebugMixIn:
             marker = None
             i = 0
             while True:
-                result = repository.scan(limit=LIST_SCAN_LIMIT, marker=marker)  # must use on-disk order scanning here
-                if not result:
+                ids, marker = repository.scan(
+                    limit=LIST_SCAN_LIMIT, marker=marker
+                )  # must use on-disk order scanning here
+                if not ids:
                     break
-                marker = result[-1]
-                for id in result:
+                for id in ids:
                     cdata = repository.get(id)
                     decrypt_dump(i, id, cdata)
                     i += 1
@@ -207,11 +208,10 @@ class DebugMixIn:
         last_id = None
         i = 0
         while True:
-            result = repository.scan(limit=LIST_SCAN_LIMIT, marker=marker)  # must use on-disk order scanning here
-            if not result:
+            ids, marker = repository.scan(limit=LIST_SCAN_LIMIT, marker=marker)  # must use on-disk order scanning here
+            if not ids:
                 break
-            marker = result[-1]
-            for id in result:
+            for id in ids:
                 cdata = repository.get(id)
                 _, data = repo_objs.parse(id, cdata)
 

+ 24 - 10
src/borg/repository.py

@@ -1209,9 +1209,14 @@ class Repository:
 
     def scan(self, limit=None, marker=None):
         """
-        list <limit> IDs starting from after id <marker> - in on-disk order, so that a client
+        list <limit> IDs starting from after <marker> - in on-disk order, so that a client
         fetching data in this order does linear reads and reuses stuff from disk cache.
 
+        marker can either be None (default, meaning "start from the beginning") or the object
+        returned from a previous scan call (meaning "continue scanning where we stopped previously").
+
+        returns: list of chunk ids, marker
+
         We rely on repository.check() has run already (either now or some time before) and that:
 
         - if we are called from a borg check command, self.index is a valid, fresh, in-sync repo index.
@@ -1224,10 +1229,11 @@ class Repository:
         transaction_id = self.get_transaction_id()
         if not self.index:
             self.index = self.open_index(transaction_id)
-        at_start = marker is None
         # smallest valid seg is <uint32> 0, smallest valid offs is <uint32> 8
-        start_segment, start_offset, _ = (0, 0, 0) if at_start else self.index[marker]
-        result = []
+        start_segment, start_offset = marker if marker is not None else (0, 0)
+        ids, segment, offset = [], 0, 0
+        # we only scan up to end_segment == transaction_id to only scan **committed** chunks,
+        # avoiding scanning into newly written chunks.
         for segment, filename in self.io.segment_iterator(start_segment, transaction_id):
             obj_iterator = self.io.iter_objects(segment, start_offset, read_data=False)
             while True:
@@ -1247,10 +1253,10 @@ class Repository:
                     in_index = self.index.get(id)
                     if in_index and (in_index.segment, in_index.offset) == (segment, offset):
                         # we have found an existing and current object
-                        result.append(id)
-                        if len(result) == limit:
-                            return result
-        return result
+                        ids.append(id)
+                        if len(ids) == limit:
+                            return ids, (segment, offset)
+        return ids, (segment, offset)
 
     def flags(self, id, mask=0xFFFFFFFF, value=None):
         """
@@ -1407,9 +1413,17 @@ class LoggedIO:
         for dir in dirs:
             filenames = os.listdir(os.path.join(data_path, dir))
             if not reverse:
-                filenames = [filename for filename in filenames if filename.isdigit() and start_segment <= int(filename) <= end_segment]
+                filenames = [
+                    filename
+                    for filename in filenames
+                    if filename.isdigit() and start_segment <= int(filename) <= end_segment
+                ]
             else:
-                filenames = [filename for filename in filenames if filename.isdigit() and start_segment >= int(filename) >= end_segment]
+                filenames = [
+                    filename
+                    for filename in filenames
+                    if filename.isdigit() and start_segment >= int(filename) >= end_segment
+                ]
             filenames = sorted(filenames, key=int, reverse=reverse)
             for filename in filenames:
                 # Note: Do not filter out logically deleted segments  (see "File system interaction" above),

+ 7 - 5
src/borg/testsuite/repository.py

@@ -189,12 +189,12 @@ class RepositoryTestCase(RepositoryTestCaseBase):
         for x in range(100):
             self.repository.put(H(x), fchunk(b"SOMEDATA"))
         self.repository.commit(compact=False)
-        all = self.repository.scan()
+        all, _ = self.repository.scan()
         assert len(all) == 100
-        first_half = self.repository.scan(limit=50)
+        first_half, marker = self.repository.scan(limit=50)
         assert len(first_half) == 50
         assert first_half == all[:50]
-        second_half = self.repository.scan(marker=first_half[-1])
+        second_half, _ = self.repository.scan(marker=marker)
         assert len(second_half) == 50
         assert second_half == all[50:]
         # check result order == on-disk order (which is hash order)
@@ -207,7 +207,8 @@ class RepositoryTestCase(RepositoryTestCaseBase):
         self.repository.commit(compact=False)
         # now we scan, read and modify chunks at the same time
         count = 0
-        for id in self.repository.scan():
+        ids, _ = self.repository.scan()
+        for id in ids:
             # scan results are in same order as we put the chunks into the repo (into the segment file)
             assert id == H(count)
             chunk = self.repository.get(id)
@@ -221,7 +222,8 @@ class RepositoryTestCase(RepositoryTestCaseBase):
 
         # now we have committed all the modified chunks, and **only** must get the modified ones.
         count = 0
-        for id in self.repository.scan():
+        ids, _ = self.repository.scan()
+        for id in ids:
             # scan results are in same order as we put the chunks into the repo (into the segment file)
             assert id == H(count)
             chunk = self.repository.get(id)