Ver Fonte

Merge pull request #891 from enkore/feature/cython-summarize

Add overflow checks to refcounting, port stuff to Cython etc
enkore há 9 anos atrás
pai
commit
dd95d0ebe8
5 ficheiros alterados com 325 adições e 85 exclusões
  1. 0 48
      borg/_hashindex.c
  2. 1 2
      borg/archive.py
  3. 3 5
      borg/cache.py
  4. 145 30
      borg/hashindex.pyx
  5. 176 0
      borg/testsuite/hashindex.py

+ 0 - 48
borg/_hashindex.c

@@ -439,51 +439,3 @@ hashindex_get_size(HashIndex *index)
 {
     return index->num_entries;
 }
-
-static void
-hashindex_summarize(HashIndex *index, long long *total_size, long long *total_csize,
-                    long long *total_unique_size, long long *total_unique_csize,
-                    long long *total_unique_chunks, long long *total_chunks)
-{
-    int64_t size = 0, csize = 0, unique_size = 0, unique_csize = 0, chunks = 0, unique_chunks = 0;
-    const int32_t *values;
-    void *key = NULL;
-
-    while((key = hashindex_next_key(index, key))) {
-        values = key + index->key_size;
-        unique_chunks++;
-        chunks += _le32toh(values[0]);
-        unique_size += _le32toh(values[1]);
-        unique_csize += _le32toh(values[2]);
-        size += (int64_t) _le32toh(values[0]) * _le32toh(values[1]);
-        csize += (int64_t) _le32toh(values[0]) * _le32toh(values[2]);
-    }
-    *total_size = size;
-    *total_csize = csize;
-    *total_unique_size = unique_size;
-    *total_unique_csize = unique_csize;
-    *total_unique_chunks = unique_chunks;
-    *total_chunks = chunks;
-}
-
-static void
-hashindex_add(HashIndex *index, const void *key, int32_t *other_values)
-{
-    int32_t *my_values = (int32_t *)hashindex_get(index, key);
-    if(my_values == NULL) {
-        hashindex_set(index, key, other_values);
-    } else {
-        *my_values += *other_values;
-    }
-}
-
-static void
-hashindex_merge(HashIndex *index, HashIndex *other)
-{
-    int32_t key_size = index->key_size;
-    void *key = NULL;
-
-    while((key = hashindex_next_key(other, key))) {
-        hashindex_add(index, key, key + key_size);
-    }
-}

+ 1 - 2
borg/archive.py

@@ -743,8 +743,7 @@ class ArchiveChecker:
 
         def add_reference(id_, size, csize, cdata=None):
             try:
-                count, _, _ = self.chunks[id_]
-                self.chunks[id_] = count + 1, size, csize
+                self.chunks.incref(id_)
             except KeyError:
                 assert cdata is not None
                 self.chunks[id_] = 1, size, csize

+ 3 - 5
borg/cache.py

@@ -379,21 +379,19 @@ Chunk index:    {0.total_unique_chunks:20d} {0.total_chunks:20d}"""
     def chunk_incref(self, id, stats):
         if not self.txn_active:
             self.begin_txn()
-        count, size, csize = self.chunks[id]
-        self.chunks[id] = (count + 1, size, csize)
+        count, size, csize = self.chunks.incref(id)
         stats.update(size, csize, False)
         return id, size, csize
 
     def chunk_decref(self, id, stats):
         if not self.txn_active:
             self.begin_txn()
-        count, size, csize = self.chunks[id]
-        if count == 1:
+        count, size, csize = self.chunks.decref(id)
+        if count == 0:
             del self.chunks[id]
             self.repository.delete(id, wait=False)
             stats.update(-size, -csize, True)
         else:
-            self.chunks[id] = (count - 1, size, csize)
             stats.update(-size, -csize, False)
 
     def file_known_and_unchanged(self, path_hash, st, ignore_inode=False):

+ 145 - 30
borg/hashindex.pyx

@@ -1,6 +1,9 @@
 # -*- coding: utf-8 -*-
 import os
 
+cimport cython
+from libc.stdint cimport uint32_t, UINT32_MAX, uint64_t
+
 API_VERSION = 2
 
 
@@ -11,9 +14,6 @@ cdef extern from "_hashindex.c":
     HashIndex *hashindex_read(char *path)
     HashIndex *hashindex_init(int capacity, int key_size, int value_size)
     void hashindex_free(HashIndex *index)
-    void hashindex_summarize(HashIndex *index, long long *total_size, long long *total_csize,
-                             long long *unique_size, long long *unique_csize,
-                             long long *total_unique_chunks, long long *total_chunks)
     void hashindex_merge(HashIndex *index, HashIndex *other)
     void hashindex_add(HashIndex *index, void *key, void *value)
     int hashindex_get_size(HashIndex *index)
@@ -22,13 +22,34 @@ cdef extern from "_hashindex.c":
     void *hashindex_next_key(HashIndex *index, void *key)
     int hashindex_delete(HashIndex *index, void *key)
     int hashindex_set(HashIndex *index, void *key, void *value)
-    int _htole32(int v)
-    int _le32toh(int v)
+    uint32_t _htole32(uint32_t v)
+    uint32_t _le32toh(uint32_t v)
 
 
 cdef _NoDefault = object()
 
-cimport cython
+"""
+The HashIndex is *not* a general purpose data structure. The value size must be at least 4 bytes, and these
+first bytes are used for in-band signalling in the data structure itself.
+
+The constant MAX_VALUE defines the valid range for these 4 bytes when interpreted as an uint32_t from 0
+to MAX_VALUE (inclusive). The following reserved values beyond MAX_VALUE are currently in use
+(byte order is LE)::
+
+    0xffffffff marks empty entries in the hashtable
+    0xfffffffe marks deleted entries in the hashtable
+
+None of the publicly available classes in this module will accept nor return a reserved value;
+AssertionError is raised instead.
+"""
+
+assert UINT32_MAX == 2**32-1
+
+# module-level constant because cdef's in classes can't have default values
+cdef uint32_t _MAX_VALUE = 2**32-1025
+MAX_VALUE = _MAX_VALUE
+
+assert _MAX_VALUE % 2 == 1
 
 @cython.internal
 cdef class IndexBase:
@@ -101,22 +122,30 @@ cdef class NSIndex(IndexBase):
 
     def __getitem__(self, key):
         assert len(key) == self.key_size
-        data = <int *>hashindex_get(self.index, <char *>key)
+        data = <uint32_t *>hashindex_get(self.index, <char *>key)
         if not data:
-            raise KeyError
-        return _le32toh(data[0]), _le32toh(data[1])
+            raise KeyError(key)
+        cdef uint32_t segment = _le32toh(data[0])
+        assert segment <= _MAX_VALUE, "maximum number of segments reached"
+        return segment, _le32toh(data[1])
 
     def __setitem__(self, key, value):
         assert len(key) == self.key_size
-        cdef int[2] data
-        data[0] = _htole32(value[0])
+        cdef uint32_t[2] data
+        cdef uint32_t segment = value[0]
+        assert segment <= _MAX_VALUE, "maximum number of segments reached"
+        data[0] = _htole32(segment)
         data[1] = _htole32(value[1])
         if not hashindex_set(self.index, <char *>key, data):
             raise Exception('hashindex_set failed')
 
     def __contains__(self, key):
+        cdef uint32_t segment
         assert len(key) == self.key_size
-        data = <int *>hashindex_get(self.index, <char *>key)
+        data = <uint32_t *>hashindex_get(self.index, <char *>key)
+        if data != NULL:
+            segment = _le32toh(data[0])
+            assert segment <= _MAX_VALUE, "maximum number of segments reached"
         return data != NULL
 
     def iteritems(self, marker=None):
@@ -149,25 +178,46 @@ cdef class NSKeyIterator:
         self.key = hashindex_next_key(self.index, <char *>self.key)
         if not self.key:
             raise StopIteration
-        cdef int *value = <int *>(self.key + self.key_size)
-        return (<char *>self.key)[:self.key_size], (_le32toh(value[0]), _le32toh(value[1]))
+        cdef uint32_t *value = <uint32_t *>(self.key + self.key_size)
+        cdef uint32_t segment = _le32toh(value[0])
+        assert segment <= _MAX_VALUE, "maximum number of segments reached"
+        return (<char *>self.key)[:self.key_size], (segment, _le32toh(value[1]))
 
 
 cdef class ChunkIndex(IndexBase):
+    """
+    Mapping of 32 byte keys to (refcount, size, csize), which are all 32-bit unsigned.
+
+    The reference count cannot overflow. If an overflow would occur, the refcount
+    is fixed to MAX_VALUE and will neither increase nor decrease by incref(), decref()
+    or add().
+
+    Prior signed 32-bit overflow is handled correctly for most cases: All values
+    from UINT32_MAX (2**32-1, inclusive) to MAX_VALUE (exclusive) are reserved and either
+    cause silent data loss (-1, -2) or will raise an AssertionError when accessed.
+    Other values are handled correctly. Note that previously the refcount could also reach
+    0 by *increasing* it.
+
+    Assigning refcounts in this reserved range is an invalid operation and raises AssertionError.
+    """
 
     value_size = 12
 
     def __getitem__(self, key):
         assert len(key) == self.key_size
-        data = <int *>hashindex_get(self.index, <char *>key)
+        data = <uint32_t *>hashindex_get(self.index, <char *>key)
         if not data:
-            raise KeyError
-        return _le32toh(data[0]), _le32toh(data[1]), _le32toh(data[2])
+            raise KeyError(key)
+        cdef uint32_t refcount = _le32toh(data[0])
+        assert refcount <= _MAX_VALUE
+        return refcount, _le32toh(data[1]), _le32toh(data[2])
 
     def __setitem__(self, key, value):
         assert len(key) == self.key_size
-        cdef int[3] data
-        data[0] = _htole32(value[0])
+        cdef uint32_t[3] data
+        cdef uint32_t refcount = value[0]
+        assert refcount <= _MAX_VALUE, "invalid reference count"
+        data[0] = _htole32(refcount)
         data[1] = _htole32(value[1])
         data[2] = _htole32(value[2])
         if not hashindex_set(self.index, <char *>key, data):
@@ -175,9 +225,38 @@ cdef class ChunkIndex(IndexBase):
 
     def __contains__(self, key):
         assert len(key) == self.key_size
-        data = <int *>hashindex_get(self.index, <char *>key)
+        data = <uint32_t *>hashindex_get(self.index, <char *>key)
+        if data != NULL:
+            assert data[0] <= _MAX_VALUE
         return data != NULL
 
+    def incref(self, key):
+        """Increase refcount for 'key', return (refcount, size, csize)"""
+        assert len(key) == self.key_size
+        data = <uint32_t *>hashindex_get(self.index, <char *>key)
+        if not data:
+            raise KeyError(key)
+        cdef uint32_t refcount = _le32toh(data[0])
+        assert refcount <= _MAX_VALUE, "invalid reference count"
+        if refcount != _MAX_VALUE:
+            refcount += 1
+        data[0] = _htole32(refcount)
+        return refcount, _le32toh(data[1]), _le32toh(data[2])
+
+    def decref(self, key):
+        """Decrease refcount for 'key', return (refcount, size, csize)"""
+        assert len(key) == self.key_size
+        data = <uint32_t *>hashindex_get(self.index, <char *>key)
+        if not data:
+            raise KeyError(key)
+        cdef uint32_t refcount = _le32toh(data[0])
+        # Never decrease a reference count of zero
+        assert 0 < refcount <= _MAX_VALUE, "invalid reference count"
+        if refcount != _MAX_VALUE:
+            refcount -= 1
+        data[0] = _htole32(refcount)
+        return refcount, _le32toh(data[1]), _le32toh(data[2])
+
     def iteritems(self, marker=None):
         cdef const void *key
         iter = ChunkKeyIterator(self.key_size)
@@ -191,22 +270,56 @@ cdef class ChunkIndex(IndexBase):
         return iter
 
     def summarize(self):
-        cdef long long total_size, total_csize, unique_size, unique_csize, total_unique_chunks, total_chunks
-        hashindex_summarize(self.index, &total_size, &total_csize,
-                            &unique_size, &unique_csize,
-                            &total_unique_chunks, &total_chunks)
-        return total_size, total_csize, unique_size, unique_csize, total_unique_chunks, total_chunks
+        cdef uint64_t size = 0, csize = 0, unique_size = 0, unique_csize = 0, chunks = 0, unique_chunks = 0
+        cdef uint32_t *values
+        cdef uint32_t refcount
+        cdef void *key = NULL
+
+        while True:
+            key = hashindex_next_key(self.index, key)
+            if not key:
+                break
+            unique_chunks += 1
+            values = <uint32_t*> (key + self.key_size)
+            refcount = _le32toh(values[0])
+            assert refcount <= MAX_VALUE, "invalid reference count"
+            chunks += refcount
+            unique_size += _le32toh(values[1])
+            unique_csize += _le32toh(values[2])
+            size += <uint64_t> _le32toh(values[1]) * _le32toh(values[0])
+            csize += <uint64_t> _le32toh(values[2]) * _le32toh(values[0])
+
+        return size, csize, unique_size, unique_csize, unique_chunks, chunks
 
     def add(self, key, refs, size, csize):
         assert len(key) == self.key_size
-        cdef int[3] data
+        cdef uint32_t[3] data
         data[0] = _htole32(refs)
         data[1] = _htole32(size)
         data[2] = _htole32(csize)
-        hashindex_add(self.index, <char *>key, data)
+        self._add(<char*> key, data)
+
+    cdef _add(self, void *key, uint32_t *data):
+        cdef uint64_t refcount1, refcount2, result64
+        values = <uint32_t*> hashindex_get(self.index, key)
+        if values:
+            refcount1 = _le32toh(values[0])
+            refcount2 = _le32toh(data[0])
+            assert refcount1 <= _MAX_VALUE
+            assert refcount2 <= _MAX_VALUE
+            result64 = refcount1 + refcount2
+            values[0] = _htole32(min(result64, _MAX_VALUE))
+        else:
+            hashindex_set(self.index, key, data)
 
     def merge(self, ChunkIndex other):
-        hashindex_merge(self.index, other.index)
+        cdef void *key = NULL
+
+        while True:
+            key = hashindex_next_key(other.index, key)
+            if not key:
+                break
+            self._add(key, <uint32_t*> (key + self.key_size))
 
 
 cdef class ChunkKeyIterator:
@@ -226,5 +339,7 @@ cdef class ChunkKeyIterator:
         self.key = hashindex_next_key(self.index, <char *>self.key)
         if not self.key:
             raise StopIteration
-        cdef int *value = <int *>(self.key + self.key_size)
-        return (<char *>self.key)[:self.key_size], (_le32toh(value[0]), _le32toh(value[1]), _le32toh(value[2]))
+        cdef uint32_t *value = <uint32_t *>(self.key + self.key_size)
+        cdef uint32_t refcount = _le32toh(value[0])
+        assert refcount <= MAX_VALUE, "invalid reference count"
+        return (<char *>self.key)[:self.key_size], (refcount, _le32toh(value[1]), _le32toh(value[2]))

+ 176 - 0
borg/testsuite/hashindex.py

@@ -1,8 +1,13 @@
+import base64
 import hashlib
 import os
+import struct
 import tempfile
+import zlib
 
+import pytest
 from ..hashindex import NSIndex, ChunkIndex
+from .. import hashindex
 from . import BaseTestCase
 
 
@@ -100,3 +105,174 @@ class HashIndexTestCase(BaseTestCase):
         assert idx1[H(2)] == (7, 200, 200)
         assert idx1[H(3)] == (3, 300, 300)
         assert idx1[H(4)] == (6, 400, 400)
+
+    def test_chunkindex_summarize(self):
+        idx = ChunkIndex()
+        idx[H(1)] = 1, 1000, 100
+        idx[H(2)] = 2, 2000, 200
+        idx[H(3)] = 3, 3000, 300
+
+        size, csize, unique_size, unique_csize, unique_chunks, chunks = idx.summarize()
+        assert size == 1000 + 2 * 2000 + 3 * 3000
+        assert csize == 100 + 2 * 200 + 3 * 300
+        assert unique_size == 1000 + 2000 + 3000
+        assert unique_csize == 100 + 200 + 300
+        assert chunks == 1 + 2 + 3
+        assert unique_chunks == 3
+
+
+class HashIndexRefcountingTestCase(BaseTestCase):
+    def test_chunkindex_limit(self):
+        idx = ChunkIndex()
+        idx[H(1)] = hashindex.MAX_VALUE - 1, 1, 2
+
+        # 5 is arbitray, any number of incref/decrefs shouldn't move it once it's limited
+        for i in range(5):
+            # first incref to move it to the limit
+            refcount, *_ = idx.incref(H(1))
+            assert refcount == hashindex.MAX_VALUE
+        for i in range(5):
+            refcount, *_ = idx.decref(H(1))
+            assert refcount == hashindex.MAX_VALUE
+
+    def _merge(self, refcounta, refcountb):
+        def merge(refcount1, refcount2):
+            idx1 = ChunkIndex()
+            idx1[H(1)] = refcount1, 1, 2
+            idx2 = ChunkIndex()
+            idx2[H(1)] = refcount2, 1, 2
+            idx1.merge(idx2)
+            refcount, *_ = idx1[H(1)]
+            return refcount
+        result = merge(refcounta, refcountb)
+        # check for commutativity
+        assert result == merge(refcountb, refcounta)
+        return result
+
+    def test_chunkindex_merge_limit1(self):
+        # Check that it does *not* limit at MAX_VALUE - 1
+        # (MAX_VALUE is odd)
+        half = hashindex.MAX_VALUE // 2
+        assert self._merge(half, half) == hashindex.MAX_VALUE - 1
+
+    def test_chunkindex_merge_limit2(self):
+        # 3000000000 + 2000000000 > MAX_VALUE
+        assert self._merge(3000000000, 2000000000) == hashindex.MAX_VALUE
+
+    def test_chunkindex_merge_limit3(self):
+        # Crossover point: both addition and limit semantics will yield the same result
+        half = hashindex.MAX_VALUE // 2
+        assert self._merge(half + 1, half) == hashindex.MAX_VALUE
+
+    def test_chunkindex_merge_limit4(self):
+        # Beyond crossover, result of addition would be 2**31
+        half = hashindex.MAX_VALUE // 2
+        assert self._merge(half + 2, half) == hashindex.MAX_VALUE
+        assert self._merge(half + 1, half + 1) == hashindex.MAX_VALUE
+
+    def test_chunkindex_add(self):
+        idx1 = ChunkIndex()
+        idx1.add(H(1), 5, 6, 7)
+        assert idx1[H(1)] == (5, 6, 7)
+        idx1.add(H(1), 1, 0, 0)
+        assert idx1[H(1)] == (6, 6, 7)
+
+    def test_incref_limit(self):
+        idx1 = ChunkIndex()
+        idx1[H(1)] = (hashindex.MAX_VALUE, 6, 7)
+        idx1.incref(H(1))
+        refcount, *_ = idx1[H(1)]
+        assert refcount == hashindex.MAX_VALUE
+
+    def test_decref_limit(self):
+        idx1 = ChunkIndex()
+        idx1[H(1)] = hashindex.MAX_VALUE, 6, 7
+        idx1.decref(H(1))
+        refcount, *_ = idx1[H(1)]
+        assert refcount == hashindex.MAX_VALUE
+
+    def test_decref_zero(self):
+        idx1 = ChunkIndex()
+        idx1[H(1)] = 0, 0, 0
+        with pytest.raises(AssertionError):
+            idx1.decref(H(1))
+
+    def test_incref_decref(self):
+        idx1 = ChunkIndex()
+        idx1.add(H(1), 5, 6, 7)
+        assert idx1[H(1)] == (5, 6, 7)
+        idx1.incref(H(1))
+        assert idx1[H(1)] == (6, 6, 7)
+        idx1.decref(H(1))
+        assert idx1[H(1)] == (5, 6, 7)
+
+    def test_setitem_raises(self):
+        idx1 = ChunkIndex()
+        with pytest.raises(AssertionError):
+            idx1[H(1)] = hashindex.MAX_VALUE + 1, 0, 0
+
+    def test_keyerror(self):
+        idx = ChunkIndex()
+        with pytest.raises(KeyError):
+            idx.incref(H(1))
+        with pytest.raises(KeyError):
+            idx.decref(H(1))
+        with pytest.raises(KeyError):
+            idx[H(1)]
+        with pytest.raises(OverflowError):
+            idx.add(H(1), -1, 0, 0)
+
+
+class HashIndexDataTestCase(BaseTestCase):
+    # This bytestring was created with 1.0-maint at c2f9533
+    HASHINDEX = b'eJzt0L0NgmAUhtHLT0LDEI6AuAEhMVYmVnSuYefC7AB3Aj9KNedJbnfyFne6P67P27w0EdG1Eac+Cm1ZybAsy7Isy7Isy7Isy7I' \
+                b'sy7Isy7Isy7Isy7Isy7Isy7Isy7Isy7Isy7Isy7Isy7Isy7Isy7Isy7Isy7Isy7Isy7Isy7Isy7Isy7Isy7Isy7LsL9nhc+cqTZ' \
+                b'3XlO2Ys++Du5fX+l1/YFmWZVmWZVmWZVmWZVmWZVmWZVmWZVmWZVmWZVmWZVmWZVmWZVmWZVmWZVmWZVmWZVn2/+0O2rYccw=='
+
+    def _serialize_hashindex(self, idx):
+        with tempfile.TemporaryDirectory() as tempdir:
+            file = os.path.join(tempdir, 'idx')
+            idx.write(file)
+            with open(file, 'rb') as f:
+                return self._pack(f.read())
+
+    def _deserialize_hashindex(self, bytestring):
+        with tempfile.TemporaryDirectory() as tempdir:
+            file = os.path.join(tempdir, 'idx')
+            with open(file, 'wb') as f:
+                f.write(self._unpack(bytestring))
+            return ChunkIndex.read(file)
+
+    def _pack(self, bytestring):
+        return base64.b64encode(zlib.compress(bytestring))
+
+    def _unpack(self, bytestring):
+        return zlib.decompress(base64.b64decode(bytestring))
+
+    def test_identical_creation(self):
+        idx1 = ChunkIndex()
+        idx1[H(1)] = 1, 2, 3
+        idx1[H(2)] = 2**31 - 1, 0, 0
+        idx1[H(3)] = 4294962296, 0, 0  # 4294962296 is -5000 interpreted as an uint32_t
+
+        assert self._serialize_hashindex(idx1) == self.HASHINDEX
+
+    def test_read_known_good(self):
+        idx1 = self._deserialize_hashindex(self.HASHINDEX)
+        assert idx1[H(1)] == (1, 2, 3)
+        assert idx1[H(2)] == (2**31 - 1, 0, 0)
+        assert idx1[H(3)] == (4294962296, 0, 0)
+
+        idx2 = ChunkIndex()
+        idx2[H(3)] = 2**32 - 123456, 6, 7
+        idx1.merge(idx2)
+        assert idx1[H(3)] == (hashindex.MAX_VALUE, 0, 0)
+
+
+def test_nsindex_segment_limit():
+    idx = NSIndex()
+    with pytest.raises(AssertionError):
+        idx[H(1)] = hashindex.MAX_VALUE + 1, 0
+    assert H(1) not in idx
+    idx[H(2)] = hashindex.MAX_VALUE, 0
+    assert H(2) in idx