Переглянути джерело

Merge pull request #8411 from ThomasWaldmann/optimize-repo-list-usage

bugfix: remove superfluous repository.list() call
TW 8 місяців тому
батько
коміт
67b62b5989

+ 7 - 13
src/borg/archiver/compact_cmd.py

@@ -10,7 +10,7 @@ from ..helpers import set_ec, EXIT_WARNING, EXIT_ERROR, format_file_size, bin_to
 from ..helpers import ProgressIndicatorPercent
 from ..manifest import Manifest
 from ..remote import RemoteRepository
-from ..repository import Repository
+from ..repository import Repository, repo_lister
 
 from ..logger import create_logger
 
@@ -49,18 +49,12 @@ class ArchiveGarbageCollector:
     def get_repository_chunks(self) -> ChunkIndex:
         """Build a dict id -> size of all chunks present in the repository"""
         chunks = ChunkIndex()
-        marker = None
-        while True:
-            result = self.repository.list(limit=LIST_SCAN_LIMIT, marker=marker)
-            if not result:
-                break
-            marker = result[-1][0]
-            for id, stored_size in result:
-                # we add this id to the chunks index, using refcount == 0, because
-                # we do not know yet whether it is actually referenced from some archives.
-                # we "abuse" the size field here. usually there is the plaintext size,
-                # but we use it for the size of the stored object here.
-                chunks[id] = ChunkIndexEntry(refcount=0, size=stored_size)
+        for id, stored_size in repo_lister(self.repository, limit=LIST_SCAN_LIMIT):
+            # we add this id to the chunks index, using refcount == 0, because
+            # we do not know yet whether it is actually referenced from some archives.
+            # we "abuse" the size field here. usually there is the plaintext size,
+            # but we use it for the size of the stored object here.
+            chunks[id] = ChunkIndexEntry(refcount=0, size=stored_size)
         return chunks
 
     def save_chunk_index(self):

+ 27 - 39
src/borg/archiver/debug_cmd.py

@@ -15,7 +15,7 @@ from ..helpers import archivename_validator
 from ..helpers import CommandError, RTError
 from ..manifest import Manifest
 from ..platform import get_process_id
-from ..repository import Repository, LIST_SCAN_LIMIT
+from ..repository import Repository, LIST_SCAN_LIMIT, repo_lister
 from ..repoobj import RepoObj
 
 from ._common import with_repository, Highlander
@@ -130,15 +130,9 @@ class DebugMixIn:
         cdata = repository.get(id)
         key = key_factory(repository, cdata)
         repo_objs = RepoObj(key)
-        marker = None
-        while True:
-            result = repository.list(limit=LIST_SCAN_LIMIT, marker=marker)
-            if not result:
-                break
-            marker = result[-1][0]
-            for id, stored_size in result:
-                cdata = repository.get(id)
-                decrypt_dump(id, cdata)
+        for id, stored_size in repo_lister(repository, limit=LIST_SCAN_LIMIT):
+            cdata = repository.get(id)
+            decrypt_dump(id, cdata)
         print("Done.")
 
     @with_repository(manifest=False)
@@ -177,38 +171,32 @@ class DebugMixIn:
         key = key_factory(repository, cdata)
         repo_objs = RepoObj(key)
 
-        marker = None
         last_data = b""
         last_id = None
         i = 0
-        while True:
-            result = repository.list(limit=LIST_SCAN_LIMIT, marker=marker)
-            if not result:
-                break
-            marker = result[-1][0]
-            for id, stored_size in result:
-                cdata = repository.get(id)
-                _, data = repo_objs.parse(id, cdata, ro_type=ROBJ_DONTCARE)
-
-                # try to locate wanted sequence crossing the border of last_data and data
-                boundary_data = last_data[-(len(wanted) - 1) :] + data[: len(wanted) - 1]
-                if wanted in boundary_data:
-                    boundary_data = last_data[-(len(wanted) - 1 + context) :] + data[: len(wanted) - 1 + context]
-                    offset = boundary_data.find(wanted)
-                    info = "%d %s | %s" % (i, last_id.hex(), id.hex())
-                    print_finding(info, wanted, boundary_data, offset)
-
-                # try to locate wanted sequence in data
-                count = data.count(wanted)
-                if count:
-                    offset = data.find(wanted)  # only determine first occurrence's offset
-                    info = "%d %s #%d" % (i, id.hex(), count)
-                    print_finding(info, wanted, data, offset)
-
-                last_id, last_data = id, data
-                i += 1
-                if i % 10000 == 0:
-                    print("%d objects processed." % i)
+        for id, stored_size in repo_lister(repository, limit=LIST_SCAN_LIMIT):
+            cdata = repository.get(id)
+            _, data = repo_objs.parse(id, cdata, ro_type=ROBJ_DONTCARE)
+
+            # try to locate wanted sequence crossing the border of last_data and data
+            boundary_data = last_data[-(len(wanted) - 1) :] + data[: len(wanted) - 1]
+            if wanted in boundary_data:
+                boundary_data = last_data[-(len(wanted) - 1 + context) :] + data[: len(wanted) - 1 + context]
+                offset = boundary_data.find(wanted)
+                info = "%d %s | %s" % (i, last_id.hex(), id.hex())
+                print_finding(info, wanted, boundary_data, offset)
+
+            # try to locate wanted sequence in data
+            count = data.count(wanted)
+            if count:
+                offset = data.find(wanted)  # only determine first occurrence's offset
+                info = "%d %s #%d" % (i, id.hex(), count)
+                print_finding(info, wanted, data, offset)
+
+            last_id, last_data = id, data
+            i += 1
+            if i % 10000 == 0:
+                print("%d objects processed." % i)
         print("Done.")
 
     @with_repository(manifest=False)

+ 9 - 17
src/borg/cache.py

@@ -31,7 +31,7 @@ from .crypto.file_integrity import IntegrityCheckedFile, FileIntegrityError
 from .manifest import Manifest
 from .platform import SaveFile
 from .remote import RemoteRepository
-from .repository import LIST_SCAN_LIMIT, Repository, StoreObjectNotFound
+from .repository import LIST_SCAN_LIMIT, Repository, StoreObjectNotFound, repo_lister
 
 # chunks is a list of ChunkListEntry
 FileCacheEntry = namedtuple("FileCacheEntry", "age inode size ctime mtime chunks")
@@ -680,22 +680,14 @@ def build_chunkindex_from_repo(repository, *, disable_caches=False, cache_immedi
     logger.debug("querying the chunk IDs list from the repo...")
     chunks = ChunkIndex()
     t0 = perf_counter()
-    num_requests = 0
     num_chunks = 0
-    marker = None
-    while True:
-        result = repository.list(limit=LIST_SCAN_LIMIT, marker=marker)
-        num_requests += 1
-        if not result:
-            break
-        marker = result[-1][0]
-        # The repo says it has these chunks, so we assume they are referenced chunks.
-        # We do not care for refcounting anymore, so we just set refcount = MAX_VALUE.
-        # We do not know the plaintext size (!= stored_size), thus we set size = 0.
-        init_entry = ChunkIndexEntry(refcount=ChunkIndex.MAX_VALUE, size=0)
-        for id, stored_size in result:
-            num_chunks += 1
-            chunks[id] = init_entry
+    # The repo says it has these chunks, so we assume they are referenced chunks.
+    # We do not care for refcounting anymore, so we just set refcount = MAX_VALUE.
+    # We do not know the plaintext size (!= stored_size), thus we set size = 0.
+    init_entry = ChunkIndexEntry(refcount=ChunkIndex.MAX_VALUE, size=0)
+    for id, stored_size in repo_lister(repository, limit=LIST_SCAN_LIMIT):
+        num_chunks += 1
+        chunks[id] = init_entry
     # Cache does not contain the manifest.
     if not isinstance(repository, (Repository, RemoteRepository)):
         del chunks[Manifest.MANIFEST_ID]
@@ -703,7 +695,7 @@ def build_chunkindex_from_repo(repository, *, disable_caches=False, cache_immedi
     # Chunk IDs in a list are encoded in 34 bytes: 1 byte msgpack header, 1 byte length, 32 ID bytes.
     # Protocol overhead is neglected in this calculation.
     speed = format_file_size(num_chunks * 34 / duration)
-    logger.debug(f"queried {num_chunks} chunk IDs in {duration} s ({num_requests} requests), ~{speed}/s")
+    logger.debug(f"queried {num_chunks} chunk IDs in {duration} s, ~{speed}/s")
     if cache_immediately:
         # immediately update cache/chunks, so we only rarely have to do it the slow way:
         write_chunkindex_to_repo_cache(repository, chunks, compact=False, clear=False, force_write=True)

+ 12 - 0
src/borg/repository.py

@@ -18,6 +18,18 @@ from .repoobj import RepoObj
 logger = create_logger(__name__)
 
 
+def repo_lister(repository, *, limit=None):
+    marker = None
+    finished = False
+    while not finished:
+        result = repository.list(limit=limit, marker=marker)
+        finished = (len(result) < limit) if limit is not None else (len(result) == 0)
+        if not finished:
+            marker = result[-1][0]
+        for id, stored_size in result:
+            yield id, stored_size
+
+
 class Repository:
     """borgstore based key value store"""
 

+ 19 - 25
src/borg/testsuite/archiver/repo_compress_cmd.py

@@ -1,7 +1,7 @@
 import os
 
 from ...constants import *  # NOQA
-from ...repository import Repository
+from ...repository import Repository, repo_lister
 from ...manifest import Manifest
 from ...compress import ZSTD, ZLIB, LZ4, CNONE
 from ...helpers import bin_to_hex
@@ -15,30 +15,24 @@ def test_repo_compress(archiver):
         repository = Repository(archiver.repository_path, exclusive=True)
         with repository:
             manifest = Manifest.load(repository, Manifest.NO_OPERATION_CHECK)
-            marker = None
-            while True:
-                result = repository.list(limit=LIST_SCAN_LIMIT, marker=marker)
-                if not result:
-                    break
-                marker = result[-1][0]
-                for id, _ in result:
-                    chunk = repository.get(id, read_data=True)
-                    meta, data = manifest.repo_objs.parse(
-                        id, chunk, ro_type=ROBJ_DONTCARE
-                    )  # will also decompress according to metadata
-                    m_olevel = meta.get("olevel", -1)
-                    m_psize = meta.get("psize", -1)
-                    print(bin_to_hex(id), meta["ctype"], meta["clevel"], meta["csize"], meta["size"], m_olevel, m_psize)
-                    # this is not as easy as one thinks due to the DecidingCompressor choosing the smallest of
-                    # (desired compressed, lz4 compressed, not compressed).
-                    assert meta["ctype"] in (ctype, LZ4.ID, CNONE.ID)
-                    assert meta["clevel"] in (clevel, 255)  # LZ4 and CNONE has level 255
-                    if olevel != -1:  # we expect obfuscation
-                        assert "psize" in meta
-                        assert m_olevel == olevel
-                    else:
-                        assert "psize" not in meta
-                        assert "olevel" not in meta
+            for id, _ in repo_lister(repository, limit=LIST_SCAN_LIMIT):
+                chunk = repository.get(id, read_data=True)
+                meta, data = manifest.repo_objs.parse(
+                    id, chunk, ro_type=ROBJ_DONTCARE
+                )  # will also decompress according to metadata
+                m_olevel = meta.get("olevel", -1)
+                m_psize = meta.get("psize", -1)
+                print(bin_to_hex(id), meta["ctype"], meta["clevel"], meta["csize"], meta["size"], m_olevel, m_psize)
+                # this is not as easy as one thinks due to the DecidingCompressor choosing the smallest of
+                # (desired compressed, lz4 compressed, not compressed).
+                assert meta["ctype"] in (ctype, LZ4.ID, CNONE.ID)
+                assert meta["clevel"] in (clevel, 255)  # LZ4 and CNONE has level 255
+                if olevel != -1:  # we expect obfuscation
+                    assert "psize" in meta
+                    assert m_olevel == olevel
+                else:
+                    assert "psize" not in meta
+                    assert "olevel" not in meta
 
     create_regular_file(archiver.input_path, "file1", size=1024 * 10)
     create_regular_file(archiver.input_path, "file2", contents=os.urandom(1024 * 10))