Procházet zdrojové kódy

repository/repository3: remove .scan method

This was an implementation specific "in on-disk order" list method that made sense
with borg 1.x log-like segment files only.

But we now store objects separately, so there is no "in on-disk order" anymore.
Thomas Waldmann před 9 měsíci
rodič
revize
60edc8255f

+ 3 - 2
src/borg/archive.py

@@ -1843,11 +1843,12 @@ class ArchiveChecker:
         pi = ProgressIndicatorPercent(
             total=chunks_count_index, msg="Verifying data %6.2f%%", step=0.01, msgid="check.verify_data"
         )
-        state = None
+        marker = None
         while True:
-            chunk_ids, state = self.repository.scan(limit=100, state=state)
+            chunk_ids = self.repository.list(limit=100, marker=marker)
             if not chunk_ids:
                 break
+            marker = chunk_ids[-1]
             chunks_count_segments += len(chunk_ids)
             chunk_data_iter = self.repository.get_many(chunk_ids)
             chunk_ids_revd = list(reversed(chunk_ids))

+ 10 - 14
src/borg/archiver/debug_cmd.py

@@ -110,19 +110,15 @@ class DebugMixIn:
 
     @with_repository(manifest=False)
     def do_debug_dump_repo_objs(self, args, repository):
-        """dump (decrypted, decompressed) repo objects, repo index MUST be current/correct"""
+        """dump (decrypted, decompressed) repo objects"""
         from ..crypto.key import key_factory
 
-        def decrypt_dump(i, id, cdata, tag=None, segment=None, offset=None):
+        def decrypt_dump(id, cdata):
             if cdata is not None:
                 _, data = repo_objs.parse(id, cdata, ro_type=ROBJ_DONTCARE)
             else:
                 _, data = {}, b""
-            tag_str = "" if tag is None else "_" + tag
-            segment_str = "_" + str(segment) if segment is not None else ""
-            offset_str = "_" + str(offset) if offset is not None else ""
-            id_str = "_" + bin_to_hex(id) if id is not None else ""
-            filename = "%08d%s%s%s%s.obj" % (i, segment_str, offset_str, tag_str, id_str)
+            filename = f"{bin_to_hex(id)}.obj"
             print("Dumping", filename)
             with open(filename, "wb") as fd:
                 fd.write(data)
@@ -132,16 +128,15 @@ class DebugMixIn:
         cdata = repository.get(ids[0])
         key = key_factory(repository, cdata)
         repo_objs = RepoObj(key)
-        state = None
-        i = 0
+        marker = None
         while True:
-            ids, state = repository.scan(limit=LIST_SCAN_LIMIT, state=state)  # must use on-disk order scanning here
+            ids = repository.list(limit=LIST_SCAN_LIMIT, marker=marker)
             if not ids:
                 break
+            marker = ids[-1]
             for id in ids:
                 cdata = repository.get(id)
-                decrypt_dump(i, id, cdata)
-                i += 1
+                decrypt_dump(id, cdata)
         print("Done.")
 
     @with_repository(manifest=False)
@@ -179,14 +174,15 @@ class DebugMixIn:
         key = key_factory(repository, cdata)
         repo_objs = RepoObj(key)
 
-        state = None
+        marker = None
         last_data = b""
         last_id = None
         i = 0
         while True:
-            ids, state = repository.scan(limit=LIST_SCAN_LIMIT, state=state)  # must use on-disk order scanning here
+            ids = repository.list(limit=LIST_SCAN_LIMIT, marker=marker)
             if not ids:
                 break
+            marker = ids[-1]
             for id in ids:
                 cdata = repository.get(id)
                 _, data = repo_objs.parse(id, cdata, ro_type=ROBJ_DONTCARE)

+ 3 - 2
src/borg/archiver/rcompress_cmd.py

@@ -24,12 +24,13 @@ def find_chunks(repository, repo_objs, stats, ctype, clevel, olevel):
     recompress_ids = []
     compr_keys = stats["compr_keys"] = set()
     compr_wanted = ctype, clevel, olevel
-    state = None
+    marker = None
     chunks_limit = 1000
     while True:
-        chunk_ids, state = repository.scan(limit=chunks_limit, state=state)
+        chunk_ids = repository.list(limit=chunks_limit, marker=marker)
         if not chunk_ids:
             break
+        marker = chunk_ids[-1]
         for id, chunk_no_data in zip(chunk_ids, repository.get_many(chunk_ids, read_data=False)):
             meta = repo_objs.parse_meta(id, chunk_no_data, ro_type=ROBJ_DONTCARE)
             compr_found = meta["ctype"], meta["clevel"], meta.get("olevel", -1)

+ 1 - 1
src/borg/constants.py

@@ -78,7 +78,7 @@ BUFSIZE = 10 * 1024 * 1024
 # MAX_DATA_SIZE or it will trigger the check for that.
 MAX_ARCHIVES = 400000
 
-# repo.list() / .scan() result count limit the borg client uses
+# repo.list() result count limit the borg client uses
 LIST_SCAN_LIMIT = 100000
 
 FD_MAX_AGE = 4 * 60  # 4 minutes

+ 0 - 5
src/borg/remote.py

@@ -145,7 +145,6 @@ class RepositoryServer:  # pragma: no cover
         "flags_many",
         "get",
         "list",
-        "scan",
         "negotiate",
         "open",
         "close",
@@ -993,10 +992,6 @@ class RemoteRepository:
     def list(self, limit=None, marker=None, mask=0, value=0):
         """actual remoting is done via self.call in the @api decorator"""
 
-    @api(since=parse_version("2.0.0b3"))
-    def scan(self, limit=None, state=None):
-        """actual remoting is done via self.call in the @api decorator"""
-
     @api(since=parse_version("2.0.0b2"))
     def flags(self, id, mask=0xFFFFFFFF, value=None):
         """actual remoting is done via self.call in the @api decorator"""

+ 0 - 6
src/borg/remote3.py

@@ -147,7 +147,6 @@ class RepositoryServer:  # pragma: no cover
         "flags_many",
         "get",
         "list",
-        "scan",
         "negotiate",
         "open",
         "close",
@@ -168,7 +167,6 @@ class RepositoryServer:  # pragma: no cover
         "destroy",
         "get",
         "list",
-        "scan",
         "negotiate",
         "open",
         "close",
@@ -1031,10 +1029,6 @@ class RemoteRepository3:
     def list(self, limit=None, marker=None, mask=0, value=0):
         """actual remoting is done via self.call in the @api decorator"""
 
-    @api(since=parse_version("2.0.0b3"))
-    def scan(self, limit=None, state=None):
-        """actual remoting is done via self.call in the @api decorator"""
-
     def get(self, id, read_data=True):
         for resp in self.get_many([id], read_data=read_data):
             return resp

+ 5 - 63
src/borg/repository.py

@@ -1217,61 +1217,6 @@ class Repository:
             self.index = self.open_index(self.get_transaction_id())
         return [id_ for id_, _ in islice(self.index.iteritems(marker=marker, mask=mask, value=value), limit)]
 
-    def scan(self, limit=None, state=None):
-        """
-        list (the next) <limit> chunk IDs from the repository - in on-disk order, so that a client
-        fetching data in this order does linear reads and reuses stuff from disk cache.
-
-        state can either be None (initially, when starting to scan) or the object
-        returned from a previous scan call (meaning "continue scanning").
-
-        returns: list of chunk ids, state
-
-        We rely on repository.check() has run already (either now or some time before) and that:
-
-        - if we are called from a borg check command, self.index is a valid, fresh, in-sync repo index.
-        - if we are called from elsewhere, either self.index or the on-disk index is valid and in-sync.
-        - the repository segments are valid (no CRC errors).
-          if we encounter CRC errors in segment entry headers, rest of segment is skipped.
-        """
-        if limit is not None and limit < 1:
-            raise ValueError("please use limit > 0 or limit = None")
-        transaction_id = self.get_transaction_id()
-        if not self.index:
-            self.index = self.open_index(transaction_id)
-        # smallest valid seg is <uint32> 0, smallest valid offs is <uint32> 8
-        start_segment, start_offset, end_segment = state if state is not None else (0, 0, transaction_id)
-        ids, segment, offset = [], 0, 0
-        # we only scan up to end_segment == transaction_id to scan only **committed** chunks,
-        # avoiding scanning into newly written chunks.
-        for segment, filename in self.io.segment_iterator(start_segment, end_segment):
-            # the start_offset we potentially got from state is only valid for the start_segment we also got
-            # from there. in case the segment file vanished meanwhile, the segment_iterator might never
-            # return a segment/filename corresponding to the start_segment and we must start from offset 0 then.
-            start_offset = start_offset if segment == start_segment else 0
-            obj_iterator = self.io.iter_objects(segment, start_offset, read_data=False)
-            while True:
-                try:
-                    tag, id, offset, size, _ = next(obj_iterator)
-                except (StopIteration, IntegrityError):
-                    # either end-of-segment or an error - we can not seek to objects at
-                    # higher offsets than one that has an error in the header fields.
-                    break
-                if start_offset > 0:
-                    # we are using a state != None and it points to the last object we have already
-                    # returned in the previous scan() call - thus, we need to skip this one object.
-                    # also, for the next segment, we need to start at offset 0.
-                    start_offset = 0
-                    continue
-                if tag in (TAG_PUT2, TAG_PUT):
-                    in_index = self.index.get(id)
-                    if in_index and (in_index.segment, in_index.offset) == (segment, offset):
-                        # we have found an existing and current object
-                        ids.append(id)
-                        if len(ids) == limit:
-                            return ids, (segment, offset, end_segment)
-        return ids, (segment, offset, end_segment)
-
     def flags(self, id, mask=0xFFFFFFFF, value=None):
         """
         query and optionally set flags
@@ -1625,7 +1570,7 @@ class LoggedIO:
         fd.seek(0)
         return fd.read(MAGIC_LEN)
 
-    def iter_objects(self, segment, offset=0, read_data=True):
+    def iter_objects(self, segment, read_data=True):
         """
         Return object iterator for *segment*.
 
@@ -1634,14 +1579,11 @@ class LoggedIO:
         The iterator returns five-tuples of (tag, key, offset, size, data).
         """
         fd = self.get_fd(segment)
+        offset = 0
         fd.seek(offset)
-        if offset == 0:
-            # we are touching this segment for the first time, check the MAGIC.
-            # Repository.scan() calls us with segment > 0 when it continues an ongoing iteration
-            # from a marker position - but then we have checked the magic before already.
-            if fd.read(MAGIC_LEN) != MAGIC:
-                raise IntegrityError(f"Invalid segment magic [segment {segment}, offset {0}]")
-            offset = MAGIC_LEN
+        if fd.read(MAGIC_LEN) != MAGIC:
+            raise IntegrityError(f"Invalid segment magic [segment {segment}, offset {offset}]")
+        offset = MAGIC_LEN
         header = fd.read(self.header_fmt.size)
         while header:
             size, tag, key, data = self._read(

+ 0 - 14
src/borg/repository3.py

@@ -307,20 +307,6 @@ class Repository3:
             return ids[:limit]
         return ids
 
-    def scan(self, limit=None, state=None):
-        """
-        list (the next) <limit> chunk IDs from the repository.
-
-        state can either be None (initially, when starting to scan) or the object
-        returned from a previous scan call (meaning "continue scanning").
-
-        returns: list of chunk ids, state
-        """
-        # we only have store.list() anyway, so just call .list() from here.
-        ids = self.list(limit=limit, marker=state)
-        state = ids[-1] if ids else None
-        return ids, state
-
     def get(self, id, read_data=True):
         self._lock_refresh()
         id_hex = bin_to_hex(id)

+ 1 - 1
src/borg/testsuite/archiver/debug_cmds.py

@@ -45,7 +45,7 @@ def test_debug_dump_repo_objs(archivers, request):
     with changedir("output"):
         output = cmd(archiver, "debug", "dump-repo-objs")
     output_dir = sorted(os.listdir("output"))
-    assert len(output_dir) > 0 and output_dir[0].startswith("00000000_")
+    assert len(output_dir) > 0
     assert "Done." in output
 
 

+ 3 - 2
src/borg/testsuite/archiver/rcompress_cmd.py

@@ -15,11 +15,12 @@ def test_rcompress(archiver):
         repository = Repository3(archiver.repository_path, exclusive=True)
         with repository:
             manifest = Manifest.load(repository, Manifest.NO_OPERATION_CHECK)
-            state = None
+            marker = None
             while True:
-                ids, state = repository.scan(limit=LIST_SCAN_LIMIT, state=state)
+                ids = repository.list(limit=LIST_SCAN_LIMIT, marker=marker)
                 if not ids:
                     break
+                marker = ids[-1]
                 for id in ids:
                     chunk = repository.get(id, read_data=True)
                     meta, data = manifest.repo_objs.parse(

+ 0 - 50
src/borg/testsuite/repository.py

@@ -222,56 +222,6 @@ def test_list(repo_fixtures, request):
         assert len(repository.list(limit=50)) == 50
 
 
-def test_scan(repo_fixtures, request):
-    with get_repository_from_fixture(repo_fixtures, request) as repository:
-        for x in range(100):
-            repository.put(H(x), fchunk(b"SOMEDATA"))
-        repository.commit(compact=False)
-        ids, _ = repository.scan()
-        assert len(ids) == 100
-        first_half, state = repository.scan(limit=50)
-        assert len(first_half) == 50
-        assert first_half == ids[:50]
-        second_half, _ = repository.scan(state=state)
-        assert len(second_half) == 50
-        assert second_half == ids[50:]
-        # check result order == on-disk order (which is hash order)
-        for x in range(100):
-            assert ids[x] == H(x)
-
-
-def test_scan_modify(repo_fixtures, request):
-    with get_repository_from_fixture(repo_fixtures, request) as repository:
-        for x in range(100):
-            repository.put(H(x), fchunk(b"ORIGINAL"))
-        repository.commit(compact=False)
-        # now we scan, read and modify chunks at the same time
-        count = 0
-        ids, _ = repository.scan()
-        for id in ids:
-            # scan results are in same order as we put the chunks into the repo (into the segment file)
-            assert id == H(count)
-            chunk = repository.get(id)
-            # check that we **only** get data that was committed when we started scanning
-            # and that we do not run into the new data we put into the repo.
-            assert pdchunk(chunk) == b"ORIGINAL"
-            count += 1
-            repository.put(id, fchunk(b"MODIFIED"))
-        assert count == 100
-        repository.commit()
-
-        # now we have committed all the modified chunks, and **only** must get the modified ones.
-        count = 0
-        ids, _ = repository.scan()
-        for id in ids:
-            # scan results are in same order as we put the chunks into the repo (into the segment file)
-            assert id == H(count)
-            chunk = repository.get(id)
-            assert pdchunk(chunk) == b"MODIFIED"
-            count += 1
-        assert count == 100
-
-
 def test_max_data_size(repo_fixtures, request):
     with get_repository_from_fixture(repo_fixtures, request) as repository:
         max_data = b"x" * (MAX_DATA_SIZE - RepoObj.obj_header.size)

+ 0 - 14
src/borg/testsuite/repository3.py

@@ -137,20 +137,6 @@ def test_list(repo_fixtures, request):
         assert len(repository.list(limit=50)) == 50
 
 
-def test_scan(repo_fixtures, request):
-    with get_repository_from_fixture(repo_fixtures, request) as repository:
-        for x in range(100):
-            repository.put(H(x), fchunk(b"SOMEDATA"))
-        ids, _ = repository.scan()
-        assert len(ids) == 100
-        first_half, state = repository.scan(limit=50)
-        assert len(first_half) == 50
-        assert first_half == ids[:50]
-        second_half, _ = repository.scan(state=state)
-        assert len(second_half) == 50
-        assert second_half == ids[50:]
-
-
 def test_max_data_size(repo_fixtures, request):
     with get_repository_from_fixture(repo_fixtures, request) as repository:
         max_data = b"x" * (MAX_DATA_SIZE - RepoObj.obj_header.size)