Explorar o código

repository index: add payload size (==csize) and flags to NSIndex entries

This saves some segment file random IO that was previously necessary
just to determine the size of to be deleted data.

Keep old one as NSIndex1 for old borg compatibility.
Choose NSIndex or NSIndex1 based on repo index layout from HashHeader.

for an old repo index repo.get(key) returns segment, offset, None, None
Thomas Waldmann %!s(int64=3) %!d(string=hai) anos
pai
achega
3ce3fbcdff

+ 96 - 3
src/borg/hashindex.pyx

@@ -77,6 +77,20 @@ assert UINT32_MAX == 2**32-1
 assert _MAX_VALUE % 2 == 1
 
 
+def hashindex_variant(fn):
+    """peek into an index file and find out what it is"""
+    with open(fn, 'rb') as f:
+        hh = f.read(18)  # len(HashHeader)
+    magic = hh[0:8]
+    if magic == b'BORG_IDX':
+        key_size = hh[16]
+        value_size = hh[17]
+        return f'k{key_size}_v{value_size}'
+    if magic == b'12345678':  # used by unit tests
+        return 'k32_v16'  # just return the current variant
+    raise ValueError(f'unknown hashindex format, magic: {magic!r}')
+
+
 @cython.internal
 cdef class IndexBase:
     cdef HashIndex *index
@@ -196,9 +210,12 @@ cdef class FuseVersionsIndex(IndexBase):
         return hashindex_get(self.index, <unsigned char *>key) != NULL
 
 
+NSIndexEntry = namedtuple('NSIndexEntry', 'segment offset size extra')
+
+
 cdef class NSIndex(IndexBase):
 
-    value_size = 8
+    value_size = 16
 
     def __getitem__(self, key):
         assert len(key) == self.key_size
@@ -207,15 +224,17 @@ cdef class NSIndex(IndexBase):
             raise KeyError(key)
         cdef uint32_t segment = _le32toh(data[0])
         assert segment <= _MAX_VALUE, "maximum number of segments reached"
-        return segment, _le32toh(data[1])
+        return NSIndexEntry(segment, _le32toh(data[1]), _le32toh(data[2]), _le32toh(data[3]))
 
     def __setitem__(self, key, value):
         assert len(key) == self.key_size
-        cdef uint32_t[2] data
+        cdef uint32_t[4] data
         cdef uint32_t segment = value[0]
         assert segment <= _MAX_VALUE, "maximum number of segments reached"
         data[0] = _htole32(segment)
         data[1] = _htole32(value[1])
+        data[2] = _htole32(value[2])
+        data[3] = _htole32(value[3])
         if not hashindex_set(self.index, <unsigned char *>key, data):
             raise Exception('hashindex_set failed')
 
@@ -248,6 +267,80 @@ cdef class NSKeyIterator:
     cdef int key_size
     cdef int exhausted
 
+    def __cinit__(self, key_size):
+        self.key = NULL
+        self.key_size = key_size
+        self.exhausted = 0
+
+    def __iter__(self):
+        return self
+
+    def __next__(self):
+        if self.exhausted:
+            raise StopIteration
+        self.key = hashindex_next_key(self.index, <unsigned char *>self.key)
+        if not self.key:
+            self.exhausted = 1
+            raise StopIteration
+        cdef uint32_t *value = <uint32_t *>(self.key + self.key_size)
+        cdef uint32_t segment = _le32toh(value[0])
+        assert segment <= _MAX_VALUE, "maximum number of segments reached"
+        return ((<char *>self.key)[:self.key_size],
+                NSIndexEntry(segment, _le32toh(value[1]), _le32toh(value[2]), _le32toh(value[3])))
+
+
+cdef class NSIndex1(IndexBase):  # legacy borg 1.x
+
+    value_size = 8
+
+    def __getitem__(self, key):
+        assert len(key) == self.key_size
+        data = <uint32_t *>hashindex_get(self.index, <unsigned char *>key)
+        if not data:
+            raise KeyError(key)
+        cdef uint32_t segment = _le32toh(data[0])
+        assert segment <= _MAX_VALUE, "maximum number of segments reached"
+        return segment, _le32toh(data[1])
+
+    def __setitem__(self, key, value):
+        assert len(key) == self.key_size
+        cdef uint32_t[2] data
+        cdef uint32_t segment = value[0]
+        assert segment <= _MAX_VALUE, "maximum number of segments reached"
+        data[0] = _htole32(segment)
+        data[1] = _htole32(value[1])
+        if not hashindex_set(self.index, <unsigned char *>key, data):
+            raise Exception('hashindex_set failed')
+
+    def __contains__(self, key):
+        cdef uint32_t segment
+        assert len(key) == self.key_size
+        data = <uint32_t *>hashindex_get(self.index, <unsigned char *>key)
+        if data != NULL:
+            segment = _le32toh(data[0])
+            assert segment <= _MAX_VALUE, "maximum number of segments reached"
+        return data != NULL
+
+    def iteritems(self, marker=None):
+        cdef const unsigned char *key
+        iter = NSKeyIterator1(self.key_size)
+        iter.idx = self
+        iter.index = self.index
+        if marker:
+            key = hashindex_get(self.index, <unsigned char *>marker)
+            if marker is None:
+                raise IndexError
+            iter.key = key - self.key_size
+        return iter
+
+
+cdef class NSKeyIterator1:  # legacy borg 1.x
+    cdef NSIndex1 idx
+    cdef HashIndex *index
+    cdef const unsigned char *key
+    cdef int key_size
+    cdef int exhausted
+
     def __cinit__(self, key_size):
         self.key = NULL
         self.key_size = key_size

+ 60 - 41
src/borg/repository.py

@@ -13,7 +13,7 @@ from functools import partial
 from itertools import islice
 
 from .constants import *  # NOQA
-from .hashindex import NSIndex
+from .hashindex import NSIndexEntry, NSIndex, NSIndex1, hashindex_variant
 from .helpers import Error, ErrorWithTraceback, IntegrityError, format_file_size, parse_file_size
 from .helpers import Location
 from .helpers import ProgressIndicatorPercent
@@ -52,6 +52,18 @@ MAX_TAG_ID = 15
 FreeSpace = partial(defaultdict, int)
 
 
+def header_size(tag):
+    if tag == TAG_PUT2:
+        size = LoggedIO.HEADER_ID_SIZE + LoggedIO.ENTRY_HASH_SIZE
+    elif tag == TAG_PUT or tag == TAG_DELETE:
+        size = LoggedIO.HEADER_ID_SIZE
+    elif tag == TAG_COMMIT:
+        size = LoggedIO.header_fmt.size
+    else:
+        raise ValueError(f"unsupported tag: {tag!r}")
+    return size
+
+
 class Repository:
     """
     Filesystem based transactional key value store
@@ -525,10 +537,14 @@ class Repository:
         if transaction_id is None:
             return NSIndex()
         index_path = os.path.join(self.path, 'index.%d' % transaction_id)
+        variant = hashindex_variant(index_path)
         integrity_data = self._read_integrity(transaction_id, 'index')
         try:
             with IntegrityCheckedFile(index_path, write=False, integrity_data=integrity_data) as fd:
-                return NSIndex.read(fd)
+                if variant == 'k32_v16':
+                    return NSIndex.read(fd)
+                if variant == 'k32_v8':  # legacy
+                    return NSIndex1.read(fd)
         except (ValueError, OSError, FileIntegrityError) as exc:
             logger.warning('Repository index missing or corrupted, trying to recover from: %s', exc)
             os.unlink(index_path)
@@ -798,14 +814,14 @@ class Repository:
                 if tag == TAG_COMMIT:
                     continue
                 in_index = self.index.get(key)
-                is_index_object = in_index == (segment, offset)
+                is_index_object = in_index and (in_index.segment, in_index.offset) == (segment, offset)
                 if tag in (TAG_PUT2, TAG_PUT) and is_index_object:
                     try:
                         new_segment, offset = self.io.write_put(key, data, raise_full=True)
                     except LoggedIO.SegmentFull:
                         complete_xfer()
                         new_segment, offset = self.io.write_put(key, data)
-                    self.index[key] = new_segment, offset
+                    self.index[key] = NSIndexEntry(new_segment, offset, len(data), in_index.extra)
                     segments.setdefault(new_segment, 0)
                     segments[new_segment] += 1
                     segments[segment] -= 1
@@ -821,10 +837,7 @@ class Repository:
                         # do not remove entry with empty shadowed_segments list here,
                         # it is needed for shadowed_put_exists code (see below)!
                         pass
-                    if tag == TAG_PUT2:
-                        self.storage_quota_use -= len(data) + self.io.HEADER_ID_SIZE + self.io.ENTRY_HASH_SIZE
-                    elif tag == TAG_PUT:
-                        self.storage_quota_use -= len(data) + self.io.HEADER_ID_SIZE
+                    self.storage_quota_use -= header_size(tag) + len(data)
                 elif tag == TAG_DELETE and not in_index:
                     # If the shadow index doesn't contain this key, then we can't say if there's a shadowed older tag,
                     # therefore we do not drop the delete, but write it to a current segment.
@@ -919,27 +932,26 @@ class Repository:
             if tag in (TAG_PUT2, TAG_PUT):
                 try:
                     # If this PUT supersedes an older PUT, mark the old segment for compaction and count the free space
-                    s, _ = self.index[key]
-                    self.compact[s] += size
-                    self.segments[s] -= 1
+                    in_index = self.index[key]
+                    self.compact[in_index.segment] += header_size(tag) + size
+                    self.segments[in_index.segment] -= 1
                 except KeyError:
                     pass
-                self.index[key] = segment, offset
+                self.index[key] = NSIndexEntry(segment, offset, size, 0)
                 self.segments[segment] += 1
-                self.storage_quota_use += size  # note: size already includes the put_header_fmt overhead
+                self.storage_quota_use += header_size(tag) + size
             elif tag == TAG_DELETE:
                 try:
                     # if the deleted PUT is not in the index, there is nothing to clean up
-                    s, offset = self.index.pop(key)
+                    in_index = self.index.pop(key)
                 except KeyError:
                     pass
                 else:
-                    if self.io.segment_exists(s):
+                    if self.io.segment_exists(in_index.segment):
                         # the old index is not necessarily valid for this transaction (e.g. compaction); if the segment
                         # is already gone, then it was already compacted.
-                        self.segments[s] -= 1
-                        size = self.io.read(s, offset, key, read_data=False)
-                        self.compact[s] += size
+                        self.segments[in_index.segment] -= 1
+                        self.compact[in_index.segment] += header_size(tag) + in_index.size
             elif tag == TAG_COMMIT:
                 continue
             else:
@@ -968,12 +980,13 @@ class Repository:
         self.compact[segment] = 0
         for tag, key, offset, size in self.io.iter_objects(segment, read_data=False):
             if tag in (TAG_PUT2, TAG_PUT):
-                if self.index.get(key, (-1, -1)) != (segment, offset):
+                in_index = self.index.get(key)
+                if not in_index or (in_index.segment, in_index.offset) != (segment, offset):
                     # This PUT is superseded later
-                    self.compact[segment] += size
+                    self.compact[segment] += header_size(tag) + size
             elif tag == TAG_DELETE:
                 # The outcome of the DELETE has been recorded in the PUT branch already
-                self.compact[segment] += size
+                self.compact[segment] += header_size(tag) + size
 
     def check(self, repair=False, save_space=False, max_duration=0):
         """Check repository consistency
@@ -1169,7 +1182,7 @@ class Repository:
             self.index = self.open_index(transaction_id)
         at_start = marker is None
         # smallest valid seg is <uint32> 0, smallest valid offs is <uint32> 8
-        start_segment, start_offset = (0, 0) if at_start else self.index[marker]
+        start_segment, start_offset, _, _ = (0, 0, 0, 0) if at_start else self.index[marker]
         result = []
         for segment, filename in self.io.segment_iterator(start_segment):
             obj_iterator = self.io.iter_objects(segment, start_offset, read_data=False, include_data=False)
@@ -1186,19 +1199,21 @@ class Repository:
                     # also, for the next segment, we need to start at offset 0.
                     start_offset = 0
                     continue
-                if tag in (TAG_PUT2, TAG_PUT) and (segment, offset) == self.index.get(id):
-                    # we have found an existing and current object
-                    result.append(id)
-                    if len(result) == limit:
-                        return result
+                if tag in (TAG_PUT2, TAG_PUT):
+                    in_index = self.index.get(id)
+                    if in_index and (in_index.segment, in_index.offset) == (segment, offset):
+                        # we have found an existing and current object
+                        result.append(id)
+                        if len(result) == limit:
+                            return result
         return result
 
     def get(self, id):
         if not self.index:
             self.index = self.open_index(self.get_transaction_id())
         try:
-            segment, offset = self.index[id]
-            return self.io.read(segment, offset, id)
+            in_index = NSIndexEntry(*((self.index[id] + (None, None))[:4]))  # legacy: no size/extra
+            return self.io.read(in_index.segment, in_index.offset, id, expected_size=in_index.size)
         except KeyError:
             raise self.ObjectNotFound(id, self.path) from None
 
@@ -1215,7 +1230,7 @@ class Repository:
         if not self._active_txn:
             self.prepare_txn(self.get_transaction_id())
         try:
-            segment, offset = self.index[id]
+            in_index = self.index[id]
         except KeyError:
             pass
         else:
@@ -1223,12 +1238,12 @@ class Repository:
             # we do not want to update the shadow_index here, because
             # we know already that we will PUT to this id, so it will
             # be in the repo index (and we won't need it in the shadow_index).
-            self._delete(id, segment, offset, update_shadow_index=False)
+            self._delete(id, in_index.segment, in_index.offset, in_index.size, update_shadow_index=False)
         segment, offset = self.io.write_put(id, data)
-        self.storage_quota_use += len(data) + self.io.HEADER_ID_SIZE + self.io.ENTRY_HASH_SIZE
+        self.storage_quota_use += header_size(TAG_PUT2) + len(data)
         self.segments.setdefault(segment, 0)
         self.segments[segment] += 1
-        self.index[id] = segment, offset
+        self.index[id] = NSIndexEntry(segment, offset, len(data), 0)
         if self.storage_quota and self.storage_quota_use > self.storage_quota:
             self.transaction_doomed = self.StorageQuotaExceeded(
                 format_file_size(self.storage_quota), format_file_size(self.storage_quota_use))
@@ -1243,22 +1258,21 @@ class Repository:
         if not self._active_txn:
             self.prepare_txn(self.get_transaction_id())
         try:
-            segment, offset = self.index.pop(id)
+            in_index = self.index.pop(id)
         except KeyError:
             raise self.ObjectNotFound(id, self.path) from None
         # if we get here, there is an object with this id in the repo,
         # we write a DEL here that shadows the respective PUT.
         # after the delete, the object is not in the repo index any more,
         # for the compaction code, we need to update the shadow_index in this case.
-        self._delete(id, segment, offset, update_shadow_index=True)
+        self._delete(id, in_index.segment, in_index.offset, in_index.size, update_shadow_index=True)
 
-    def _delete(self, id, segment, offset, *, update_shadow_index):
+    def _delete(self, id, segment, offset, size, *, update_shadow_index):
         # common code used by put and delete
         if update_shadow_index:
             self.shadow_index.setdefault(id, []).append(segment)
         self.segments[segment] -= 1
-        size = self.io.read(segment, offset, id, read_data=False)
-        self.compact[segment] += size
+        self.compact[segment] += header_size(TAG_PUT2) + size
         segment, size = self.io.write_delete(id)
         self.compact[segment] += size
         self.segments.setdefault(segment, 0)
@@ -1515,7 +1529,8 @@ class LoggedIO:
             if include_data:
                 yield tag, key, offset, data
             else:
-                yield tag, key, offset, size
+                yield tag, key, offset, size - header_size(tag)  # corresponds to len(data)
+            assert size >= 0
             offset += size
             # we must get the fd via get_fd() here again as we yielded to our caller and it might
             # have triggered closing of the fd we had before (e.g. by calling io.read() for
@@ -1580,7 +1595,7 @@ class LoggedIO:
             h.update(d)
         return h.digest()
 
-    def read(self, segment, offset, id, read_data=True):
+    def read(self, segment, offset, id, read_data=True, *, expected_size=None):
         """
         Read entry from *segment* at *offset* with *id*.
         If read_data is False the size of the entry is returned instead.
@@ -1596,7 +1611,11 @@ class LoggedIO:
         if id != key:
             raise IntegrityError('Invalid segment entry header, is not for wanted id [segment {}, offset {}]'.format(
                 segment, offset))
-        return data if read_data else size
+        data_size_from_header = size - header_size(tag)
+        if expected_size is not None and expected_size != data_size_from_header:
+            raise IntegrityError(f'size from repository index: {expected_size} != '
+                                 f'size from entry header: {data_size_from_header}')
+        return data if read_data else data_size_from_header
 
     def _read(self, fd, header, segment, offset, acceptable_tags, read_data=True):
         """

+ 24 - 24
src/borg/testsuite/hashindex.py

@@ -87,8 +87,8 @@ class HashIndexTestCase(BaseTestCase):
         del idx
 
     def test_nsindex(self):
-        self._generic_test(NSIndex, lambda x: (x, x),
-                           '85f72b036c692c8266e4f51ccf0cff2147204282b5e316ae508d30a448d88fef')
+        self._generic_test(NSIndex, lambda x: (x, x, x, x),
+                           'c9fe5878800d2a0691b667c665a00d4a186e204e891076d6b109016940742bed')
 
     def test_chunkindex(self):
         self._generic_test(ChunkIndex, lambda x: (x, x),
@@ -102,7 +102,7 @@ class HashIndexTestCase(BaseTestCase):
             initial_size = os.path.getsize(filepath)
             self.assert_equal(len(idx), 0)
             for x in range(n):
-                idx[H(x)] = x, x
+                idx[H(x)] = x, x, x, x
             idx.write(filepath)
             assert initial_size < os.path.getsize(filepath)
             for x in range(n):
@@ -114,7 +114,7 @@ class HashIndexTestCase(BaseTestCase):
     def test_iteritems(self):
         idx = NSIndex()
         for x in range(100):
-            idx[H(x)] = x, x
+            idx[H(x)] = x, x, x, x
         iterator = idx.iteritems()
         all = list(iterator)
         self.assert_equal(len(all), 100)
@@ -514,9 +514,9 @@ class NSIndexTestCase(BaseTestCase):
     def test_nsindex_segment_limit(self):
         idx = NSIndex()
         with self.assert_raises(AssertionError):
-            idx[H(1)] = NSIndex.MAX_VALUE + 1, 0
+            idx[H(1)] = NSIndex.MAX_VALUE + 1, 0, 0, 0
         assert H(1) not in idx
-        idx[H(2)] = NSIndex.MAX_VALUE, 0
+        idx[H(2)] = NSIndex.MAX_VALUE, 0, 0, 0
         assert H(2) in idx
 
 
@@ -531,38 +531,38 @@ class IndexCorruptionTestCase(BaseTestCase):
 
         from struct import pack
 
-        def HH(x, y):
-            # make some 32byte long thing that depends on x and y.
-            # same x will mean a collision in the hashtable as bucket index is computed from
-            # first 4 bytes. giving a specific x targets bucket index x.
-            # y is to create different keys and does not go into the bucket index calculation.
-            # so, same x + different y --> collision
-            return pack('<IIQQQ', x, y, 0, 0, 0)  # 2 * 4 + 3 * 8 == 32
+        def HH(w, x, y, z):
+            # make some 32byte long thing that depends on w, x, y, z.
+            # same w will mean a collision in the hashtable as bucket index is computed from
+            # first 4 bytes. giving a specific w targets bucket index w.
+            # x is to create different keys and does not go into the bucket index calculation.
+            # so, same w + different x --> collision
+            return pack('<IIIIIIII', w, x, y, z, 0, 0, 0, 0)  # 8 * 4 == 32
 
         idx = NSIndex()
 
         # create lots of colliding entries
-        for y in range(700):  # stay below max load to not trigger resize
-            idx[HH(0, y)] = (0, y)
+        for x in range(700):  # stay below max load to not trigger resize
+            idx[HH(0, x, 0, 0)] = (0, x, 0, 0)
 
-        assert idx.size() == 1031 * 40 + 18  # 1031 buckets + header
+        assert idx.size() == 1031 * 48 + 18  # 1031 buckets + header
 
         # delete lots of the collisions, creating lots of tombstones
-        for y in range(400):  # stay above min load to not trigger resize
-            del idx[HH(0, y)]
+        for x in range(400):  # stay above min load to not trigger resize
+            del idx[HH(0, x, 0, 0)]
 
         # create lots of colliding entries, within the not yet used part of the hashtable
-        for y in range(330):  # stay below max load to not trigger resize
-            # at y == 259 a resize will happen due to going beyond max EFFECTIVE load
+        for x in range(330):  # stay below max load to not trigger resize
+            # at x == 259 a resize will happen due to going beyond max EFFECTIVE load
             # if the bug is present, that element will be inserted at the wrong place.
             # and because it will be at the wrong place, it can not be found again.
-            idx[HH(600, y)] = 600, y
+            idx[HH(600, x, 0, 0)] = 600, x, 0, 0
 
         # now check if hashtable contents is as expected:
 
-        assert [idx.get(HH(0, y)) for y in range(400, 700)] == [(0, y) for y in range(400, 700)]
+        assert [idx.get(HH(0, x, 0, 0)) for x in range(400, 700)] == [(0, x, 0, 0) for x in range(400, 700)]
 
-        assert [HH(0, y) in idx for y in range(400)] == [False for y in range(400)]  # deleted entries
+        assert [HH(0, x, 0, 0) in idx for x in range(400)] == [False for x in range(400)]  # deleted entries
 
         # this will fail at HH(600, 259) if the bug is present.
-        assert [idx.get(HH(600, y)) for y in range(330)] == [(600, y) for y in range(330)]
+        assert [idx.get(HH(600, x, 0, 0)) for x in range(330)] == [(600, x, 0, 0) for x in range(330)]

+ 1 - 1
src/borg/testsuite/repository.py

@@ -714,7 +714,7 @@ class RepositoryCheckTestCase(RepositoryTestCaseBase):
 
     def corrupt_object(self, id_):
         idx = self.open_index()
-        segment, offset = idx[H(id_)]
+        segment, offset, _, _ = idx[H(id_)]
         with open(os.path.join(self.tmppath, 'repository', 'data', '0', str(segment)), 'r+b') as fd:
             fd.seek(offset)
             fd.write(b'BOOM')