Browse Source

Merge pull request #1682 from ThomasWaldmann/repo-list-in-order

WIP: repo.scan - list keys in disk-order
TW 9 years ago
parent
commit
1030660e71
4 changed files with 82 additions and 6 deletions
  1. 16 5
      src/borg/archive.py
  2. 4 0
      src/borg/remote.py
  3. 44 1
      src/borg/repository.py
  4. 18 0
      src/borg/testsuite/repository.py

+ 16 - 5
src/borg/archive.py

@@ -1041,12 +1041,18 @@ class ArchiveChecker:
 
     def verify_data(self):
         logger.info('Starting cryptographic data integrity verification...')
-        count = len(self.chunks)
+        chunks_count_index = len(self.chunks)
+        chunks_count_segments = 0
         errors = 0
         defect_chunks = []
-        pi = ProgressIndicatorPercent(total=count, msg="Verifying data %6.2f%%", step=0.01)
-        for chunk_infos in chunkit(self.chunks.iteritems(), 100):
-            chunk_ids = [chunk_id for chunk_id, _ in chunk_infos]
+        pi = ProgressIndicatorPercent(total=chunks_count_index, msg="Verifying data %6.2f%%", step=0.01)
+        marker = None
+        while True:
+            chunk_ids = self.repository.scan(limit=100, marker=marker)
+            if not chunk_ids:
+                break
+            chunks_count_segments += len(chunk_ids)
+            marker = chunk_ids[-1]
             chunk_data_iter = self.repository.get_many(chunk_ids)
             chunk_ids_revd = list(reversed(chunk_ids))
             while chunk_ids_revd:
@@ -1074,6 +1080,10 @@ class ArchiveChecker:
                         logger.error('chunk %s, integrity error: %s', bin_to_hex(chunk_id), integrity_error)
                         defect_chunks.append(chunk_id)
         pi.finish()
+        if chunks_count_index != chunks_count_segments:
+            logger.error('Repo/Chunks index object count vs. segment files object count mismatch.')
+            logger.error('Repo/Chunks index: %d objects != segment files: %d objects',
+                         chunks_count_index, chunks_count_segments)
         if defect_chunks:
             if self.repair:
                 # if we kill the defect chunk here, subsequent actions within this "borg check"
@@ -1106,7 +1116,8 @@ class ArchiveChecker:
                 for defect_chunk in defect_chunks:
                     logger.debug('chunk %s is defect.', bin_to_hex(defect_chunk))
         log = logger.error if errors else logger.info
-        log('Finished cryptographic data integrity verification, verified %d chunks with %d integrity errors.', count, errors)
+        log('Finished cryptographic data integrity verification, verified %d chunks with %d integrity errors.',
+            chunks_count_segments, errors)
 
     def rebuild_manifest(self):
         """Rebuild the manifest object if it is missing

+ 4 - 0
src/borg/remote.py

@@ -62,6 +62,7 @@ class RepositoryServer:  # pragma: no cover
         'destroy',
         'get',
         'list',
+        'scan',
         'negotiate',
         'open',
         'put',
@@ -467,6 +468,9 @@ This problem will go away as soon as the server has been upgraded to 1.0.7+.
     def list(self, limit=None, marker=None):
         return self.call('list', limit, marker)
 
+    def scan(self, limit=None, marker=None):
+        return self.call('scan', limit, marker)
+
     def get(self, id_):
         for resp in self.get_many([id_]):
             return resp

+ 44 - 1
src/borg/repository.py

@@ -2,7 +2,7 @@ import errno
 import os
 import shutil
 import struct
-from binascii import unhexlify
+from binascii import hexlify, unhexlify
 from collections import defaultdict
 from configparser import ConfigParser
 from datetime import datetime
@@ -750,10 +750,53 @@ class Repository:
         return id in self.index
 
     def list(self, limit=None, marker=None):
+        """
+        list <limit> IDs starting from after id <marker> - in index (pseudo-random) order.
+        """
         if not self.index:
             self.index = self.open_index(self.get_transaction_id())
         return [id_ for id_, _ in islice(self.index.iteritems(marker=marker), limit)]
 
+    def scan(self, limit=None, marker=None):
+        """
+        list <limit> IDs starting from after id <marker> - in on-disk order, so that a client
+        fetching data in this order does linear reads and reuses stuff from disk cache.
+
+        We rely on repository.check() has run already (either now or some time before) and that:
+        - if we are called from a borg check command, self.index is a valid, fresh, in-sync repo index.
+        - if we are called from elsewhere, either self.index or the on-disk index is valid and in-sync.
+        - the repository segments are valid (no CRC errors).
+          if we encounter CRC errors in segment entry headers, rest of segment is skipped.
+        """
+        if limit is not None and limit < 1:
+            raise ValueError('please use limit > 0 or limit = None')
+        if not self.index:
+            transaction_id = self.get_transaction_id()
+            self.index = self.open_index(transaction_id)
+        at_start = marker is None
+        # smallest valid seg is <uint32> 0, smallest valid offs is <uint32> 8
+        marker_segment, marker_offset = (0, 0) if at_start else self.index[marker]
+        result = []
+        for segment, filename in self.io.segment_iterator():
+            if segment < marker_segment:
+                continue
+            obj_iterator = self.io.iter_objects(segment, read_data=False, include_data=False)
+            while True:
+                try:
+                    tag, id, offset, size = next(obj_iterator)
+                except (StopIteration, IntegrityError):
+                    # either end-of-segment or an error - we can not seek to objects at
+                    # higher offsets than one that has an error in the header fields
+                    break
+                if segment == marker_segment and offset <= marker_offset:
+                    continue
+                if tag == TAG_PUT and (segment, offset) == self.index.get(id):
+                    # we have found an existing and current object
+                    result.append(id)
+                    if len(result) == limit:
+                        return result
+        return result
+
     def get(self, id_):
         if not self.index:
             self.index = self.open_index(self.get_transaction_id())

+ 18 - 0
src/borg/testsuite/repository.py

@@ -133,6 +133,7 @@ class RepositoryTestCase(RepositoryTestCaseBase):
     def test_list(self):
         for x in range(100):
             self.repository.put(H(x), b'SOMEDATA')
+        self.repository.commit()
         all = self.repository.list()
         self.assert_equal(len(all), 100)
         first_half = self.repository.list(limit=50)
@@ -143,6 +144,23 @@ class RepositoryTestCase(RepositoryTestCaseBase):
         self.assert_equal(second_half, all[50:])
         self.assert_equal(len(self.repository.list(limit=50)), 50)
 
+    def test_scan(self):
+        for x in range(100):
+            self.repository.put(H(x), b'SOMEDATA')
+        self.repository.commit()
+        all = self.repository.scan()
+        assert len(all) == 100
+        first_half = self.repository.scan(limit=50)
+        assert len(first_half) == 50
+        assert first_half == all[:50]
+        second_half = self.repository.scan(marker=first_half[-1])
+        assert len(second_half) == 50
+        assert second_half == all[50:]
+        assert len(self.repository.scan(limit=50)) == 50
+        # check result order == on-disk order (which is hash order)
+        for x in range(100):
+            assert all[x] == H(x)
+
     def test_max_data_size(self):
         max_data = b'x' * MAX_DATA_SIZE
         self.repository.put(H(0), max_data)