Procházet zdrojové kódy

Merge pull request #2638 from enkore/f/fastcachesync-minify

Compact chunks.archive.d
enkore před 8 roky
rodič
revize
13f396d5ad

+ 68 - 9
src/borg/_hashindex.c

@@ -109,10 +109,11 @@ static int hash_sizes[] = {
 #define EPRINTF_PATH(path, msg, ...) fprintf(stderr, "hashindex: %s: " msg " (%s)\n", path, ##__VA_ARGS__, strerror(errno))
 
 #ifdef Py_PYTHON_H
-static HashIndex *hashindex_read(PyObject *file_py);
+static HashIndex *hashindex_read(PyObject *file_py, int permit_compact);
 static void hashindex_write(HashIndex *index, PyObject *file_py);
 #endif
 
+static uint64_t hashindex_compact(HashIndex *index);
 static HashIndex *hashindex_init(int capacity, int key_size, int value_size);
 static const void *hashindex_get(HashIndex *index, const void *key);
 static int hashindex_set(HashIndex *index, const void *key, const void *value);
@@ -273,7 +274,7 @@ count_empty(HashIndex *index)
 
 #ifdef Py_PYTHON_H
 static HashIndex *
-hashindex_read(PyObject *file_py)
+hashindex_read(PyObject *file_py, int permit_compact)
 {
     Py_ssize_t length, buckets_length, bytes_read;
     Py_buffer header_buffer;
@@ -393,14 +394,16 @@ hashindex_read(PyObject *file_py)
     }
     index->buckets = index->buckets_buffer.buf;
 
-    index->min_empty = get_min_empty(index->num_buckets);
-    index->num_empty = count_empty(index);
+    if(!permit_compact) {
+        index->min_empty = get_min_empty(index->num_buckets);
+        index->num_empty = count_empty(index);
 
-    if(index->num_empty < index->min_empty) {
-        /* too many tombstones here / not enough empty buckets, do a same-size rebuild */
-        if(!hashindex_resize(index, index->num_buckets)) {
-            PyErr_Format(PyExc_ValueError, "Failed to rebuild table");
-            goto fail_free_buckets;
+        if(index->num_empty < index->min_empty) {
+            /* too many tombstones here / not enough empty buckets, do a same-size rebuild */
+            if(!hashindex_resize(index, index->num_buckets)) {
+                PyErr_Format(PyExc_ValueError, "Failed to rebuild table");
+                goto fail_free_buckets;
+            }
         }
     }
 
@@ -620,6 +623,62 @@ hashindex_next_key(HashIndex *index, const void *key)
     return BUCKET_ADDR(index, idx);
 }
 
+static uint64_t
+hashindex_compact(HashIndex *index)
+{
+    int idx = 0;
+    int start_idx;
+    int begin_used_idx;
+    int empty_slot_count, count, buckets_to_copy;
+    int compact_tail_idx = 0;
+    uint64_t saved_size = (index->num_buckets - index->num_entries) * (uint64_t)index->bucket_size;
+
+    if(index->num_buckets - index->num_entries == 0) {
+        /* already compact */
+        return 0;
+    }
+
+    while(idx < index->num_buckets) {
+        /* Phase 1: Find some empty slots */
+        start_idx = idx;
+        while((BUCKET_IS_EMPTY(index, idx) || BUCKET_IS_DELETED(index, idx)) && idx < index->num_buckets) {
+            idx++;
+        }
+
+        /* everything from start_idx to idx is empty or deleted */
+        count = empty_slot_count = idx - start_idx;
+        begin_used_idx = idx;
+
+        if(!empty_slot_count) {
+            /* In case idx==compact_tail_idx, the areas overlap */
+            memmove(BUCKET_ADDR(index, compact_tail_idx), BUCKET_ADDR(index, idx), index->bucket_size);
+            idx++;
+            compact_tail_idx++;
+            continue;
+        }
+
+        /* Phase 2: Find some non-empty/non-deleted slots we can move to the compact tail */
+
+        while(!(BUCKET_IS_EMPTY(index, idx) || BUCKET_IS_DELETED(index, idx)) && empty_slot_count && idx < index->num_buckets) {
+            idx++;
+            empty_slot_count--;
+        }
+
+        buckets_to_copy = count - empty_slot_count;
+
+        if(!buckets_to_copy) {
+            /* Nothing to move, reached end of the buckets array with no used buckets. */
+            break;
+        }
+
+        memcpy(BUCKET_ADDR(index, compact_tail_idx), BUCKET_ADDR(index, begin_used_idx), buckets_to_copy * index->bucket_size);
+        compact_tail_idx += buckets_to_copy;
+    }
+
+    index->num_buckets = index->num_entries;
+    return saved_size;
+}
+
 static int
 hashindex_len(HashIndex *index)
 {

+ 72 - 31
src/borg/cache.py

@@ -536,6 +536,10 @@ Chunk index:    {0.total_unique_chunks:20d} {0.total_chunks:20d}"""
         archive indexes.
         """
         archive_path = os.path.join(self.path, 'chunks.archive.d')
+        # Instrumentation
+        processed_item_metadata_bytes = 0
+        processed_item_metadata_chunks = 0
+        compact_chunks_archive_saved_space = 0
 
         def mkpath(id, suffix=''):
             id_hex = bin_to_hex(id)
@@ -545,8 +549,10 @@ Chunk index:    {0.total_unique_chunks:20d} {0.total_chunks:20d}"""
         def cached_archives():
             if self.do_cache:
                 fns = os.listdir(archive_path)
-                # filenames with 64 hex digits == 256bit
-                return set(unhexlify(fn) for fn in fns if len(fn) == 64)
+                # filenames with 64 hex digits == 256bit,
+                # or compact indices which are 64 hex digits + ".compact"
+                return set(unhexlify(fn) for fn in fns if len(fn) == 64) | \
+                       set(unhexlify(fn[:64]) for fn in fns if len(fn) == 72 and fn.endswith('.compact'))
             else:
                 return set()
 
@@ -557,14 +563,23 @@ Chunk index:    {0.total_unique_chunks:20d} {0.total_chunks:20d}"""
             for id in ids:
                 cleanup_cached_archive(id)
 
-        def cleanup_cached_archive(id):
-            os.unlink(mkpath(id))
+        def cleanup_cached_archive(id, cleanup_compact=True):
             try:
+                os.unlink(mkpath(id))
                 os.unlink(mkpath(id) + '.integrity')
             except FileNotFoundError:
                 pass
+            if not cleanup_compact:
+                return
+            try:
+                os.unlink(mkpath(id, suffix='.compact'))
+                os.unlink(mkpath(id, suffix='.compact') + '.integrity')
+            except FileNotFoundError:
+                pass
 
         def fetch_and_build_idx(archive_id, decrypted_repository, chunk_idx):
+            nonlocal processed_item_metadata_bytes
+            nonlocal processed_item_metadata_chunks
             csize, data = decrypted_repository.get(archive_id)
             chunk_idx.add(archive_id, 1, len(data), csize)
             archive = ArchiveItem(internal_dict=msgpack.unpackb(data))
@@ -573,18 +588,54 @@ Chunk index:    {0.total_unique_chunks:20d} {0.total_chunks:20d}"""
             sync = CacheSynchronizer(chunk_idx)
             for item_id, (csize, data) in zip(archive.items, decrypted_repository.get_many(archive.items)):
                 chunk_idx.add(item_id, 1, len(data), csize)
+                processed_item_metadata_bytes += len(data)
+                processed_item_metadata_chunks += 1
                 sync.feed(data)
             if self.do_cache:
-                fn = mkpath(archive_id)
-                fn_tmp = mkpath(archive_id, suffix='.tmp')
+                write_archive_index(archive_id, chunk_idx)
+
+        def write_archive_index(archive_id, chunk_idx):
+            nonlocal compact_chunks_archive_saved_space
+            compact_chunks_archive_saved_space += chunk_idx.compact()
+            fn = mkpath(archive_id, suffix='.compact')
+            fn_tmp = mkpath(archive_id, suffix='.tmp')
+            try:
+                with DetachedIntegrityCheckedFile(path=fn_tmp, write=True,
+                                                  filename=bin_to_hex(archive_id) + '.compact') as fd:
+                    chunk_idx.write(fd)
+            except Exception:
+                os.unlink(fn_tmp)
+            else:
+                os.rename(fn_tmp, fn)
+
+        def read_archive_index(archive_id, archive_name):
+            archive_chunk_idx_path = mkpath(archive_id)
+            logger.info("Reading cached archive chunk index for %s ...", archive_name)
+            try:
                 try:
-                    with DetachedIntegrityCheckedFile(path=fn_tmp, write=True,
-                                                      filename=bin_to_hex(archive_id)) as fd:
-                        chunk_idx.write(fd)
-                except Exception:
-                    os.unlink(fn_tmp)
-                else:
-                    os.rename(fn_tmp, fn)
+                    # Attempt to load compact index first
+                    with DetachedIntegrityCheckedFile(path=archive_chunk_idx_path + '.compact', write=False) as fd:
+                        archive_chunk_idx = ChunkIndex.read(fd, permit_compact=True)
+                    # In case a non-compact index exists, delete it.
+                    cleanup_cached_archive(archive_id, cleanup_compact=False)
+                    # Compact index read - return index, no conversion necessary (below).
+                    return archive_chunk_idx
+                except FileNotFoundError:
+                    # No compact index found, load non-compact index, and convert below.
+                    with DetachedIntegrityCheckedFile(path=archive_chunk_idx_path, write=False) as fd:
+                        archive_chunk_idx = ChunkIndex.read(fd)
+            except FileIntegrityError as fie:
+                logger.error('Cached archive chunk index of %s is corrupted: %s', archive_name, fie)
+                # Delete corrupted index, set warning. A new index must be build.
+                cleanup_cached_archive(archive_id)
+                set_ec(EXIT_WARNING)
+                return None
+
+            # Convert to compact index. Delete the existing index first.
+            logger.debug('Found non-compact index for %s, converting to compact.', archive_name)
+            cleanup_cached_archive(archive_id)
+            write_archive_index(archive_id, archive_chunk_idx)
+            return archive_chunk_idx
 
         def get_archive_ids_to_names(archive_ids):
             # Pass once over all archives and build a mapping from ids to names.
@@ -612,7 +663,7 @@ Chunk index:    {0.total_unique_chunks:20d} {0.total_chunks:20d}"""
             # due to hash table "resonance".
             master_index_capacity = int(len(self.repository) / ChunkIndex.MAX_LOAD_FACTOR)
             if archive_ids:
-                chunk_idx = None
+                chunk_idx = None if not self.do_cache else ChunkIndex(master_index_capacity)
                 pi = ProgressIndicatorPercent(total=len(archive_ids), step=0.1,
                                               msg='%3.0f%% Syncing chunks cache. Processing archive %s',
                                               msgid='cache.sync')
@@ -621,17 +672,9 @@ Chunk index:    {0.total_unique_chunks:20d} {0.total_chunks:20d}"""
                     pi.show(info=[remove_surrogates(archive_name)])
                     if self.do_cache:
                         if archive_id in cached_ids:
-                            archive_chunk_idx_path = mkpath(archive_id)
-                            logger.info("Reading cached archive chunk index for %s ...", archive_name)
-                            try:
-                                with DetachedIntegrityCheckedFile(path=archive_chunk_idx_path, write=False) as fd:
-                                    archive_chunk_idx = ChunkIndex.read(fd)
-                            except FileIntegrityError as fie:
-                                logger.error('Cached archive chunk index of %s is corrupted: %s', archive_name, fie)
-                                # Delete it and fetch a new index
-                                cleanup_cached_archive(archive_id)
+                            archive_chunk_idx = read_archive_index(archive_id, archive_name)
+                            if archive_chunk_idx is None:
                                 cached_ids.remove(archive_id)
-                                set_ec(EXIT_WARNING)
                         if archive_id not in cached_ids:
                             # Do not make this an else branch; the FileIntegrityError exception handler
                             # above can remove *archive_id* from *cached_ids*.
@@ -639,18 +682,16 @@ Chunk index:    {0.total_unique_chunks:20d} {0.total_chunks:20d}"""
                             archive_chunk_idx = ChunkIndex()
                             fetch_and_build_idx(archive_id, decrypted_repository, archive_chunk_idx)
                         logger.info("Merging into master chunks index ...")
-                        if chunk_idx is None:
-                            # we just use the first archive's idx as starting point,
-                            # to avoid growing the hash table from 0 size and also
-                            # to save 1 merge call.
-                            chunk_idx = archive_chunk_idx
-                        else:
-                            chunk_idx.merge(archive_chunk_idx)
+                        chunk_idx.merge(archive_chunk_idx)
                     else:
                         chunk_idx = chunk_idx or ChunkIndex(master_index_capacity)
                         logger.info('Fetching archive index for %s ...', archive_name)
                         fetch_and_build_idx(archive_id, decrypted_repository, chunk_idx)
                 pi.finish()
+                logger.debug('Cache sync: processed %s bytes (%d chunks) of metadata',
+                             format_file_size(processed_item_metadata_bytes), processed_item_metadata_chunks)
+                logger.debug('Cache sync: compact chunks.archive.d storage saved %s bytes',
+                             format_file_size(compact_chunks_archive_saved_space))
             logger.info('Done.')
             return chunk_idx
 

+ 11 - 7
src/borg/hashindex.pyx

@@ -9,14 +9,14 @@ from libc.errno cimport errno
 from cpython.exc cimport PyErr_SetFromErrnoWithFilename
 from cpython.buffer cimport PyBUF_SIMPLE, PyObject_GetBuffer, PyBuffer_Release
 
-API_VERSION = '1.1_02'
+API_VERSION = '1.1_03'
 
 
 cdef extern from "_hashindex.c":
     ctypedef struct HashIndex:
         pass
 
-    HashIndex *hashindex_read(object file_py) except *
+    HashIndex *hashindex_read(object file_py, int permit_compact) except *
     HashIndex *hashindex_init(int capacity, int key_size, int value_size)
     void hashindex_free(HashIndex *index)
     int hashindex_len(HashIndex *index)
@@ -26,6 +26,7 @@ cdef extern from "_hashindex.c":
     void *hashindex_next_key(HashIndex *index, void *key)
     int hashindex_delete(HashIndex *index, void *key)
     int hashindex_set(HashIndex *index, void *key, void *value)
+    uint64_t hashindex_compact(HashIndex *index)
     uint32_t _htole32(uint32_t v)
     uint32_t _le32toh(uint32_t v)
 
@@ -74,14 +75,14 @@ cdef class IndexBase:
     MAX_LOAD_FACTOR = HASH_MAX_LOAD
     MAX_VALUE = _MAX_VALUE
 
-    def __cinit__(self, capacity=0, path=None, key_size=32):
+    def __cinit__(self, capacity=0, path=None, key_size=32, permit_compact=False):
         self.key_size = key_size
         if path:
             if isinstance(path, (str, bytes)):
                 with open(path, 'rb') as fd:
-                    self.index = hashindex_read(fd)
+                    self.index = hashindex_read(fd, permit_compact)
             else:
-                self.index = hashindex_read(path)
+                self.index = hashindex_read(path, permit_compact)
             assert self.index, 'hashindex_read() returned NULL with no exception set'
         else:
             self.index = hashindex_init(capacity, self.key_size, self.value_size)
@@ -93,8 +94,8 @@ cdef class IndexBase:
             hashindex_free(self.index)
 
     @classmethod
-    def read(cls, path):
-        return cls(path=path)
+    def read(cls, path, permit_compact=False):
+        return cls(path=path, permit_compact=permit_compact)
 
     def write(self, path):
         if isinstance(path, (str, bytes)):
@@ -141,6 +142,9 @@ cdef class IndexBase:
         """Return size (bytes) of hash table."""
         return hashindex_size(self.index)
 
+    def compact(self):
+        return hashindex_compact(self.index)
+
 
 cdef class NSIndex(IndexBase):
 

+ 1 - 1
src/borg/helpers.py

@@ -125,7 +125,7 @@ def check_python():
 
 def check_extension_modules():
     from . import platform, compress, item
-    if hashindex.API_VERSION != '1.1_02':
+    if hashindex.API_VERSION != '1.1_03':
         raise ExtensionModuleError
     if chunker.API_VERSION != '1.1_01':
         raise ExtensionModuleError

+ 2 - 2
src/borg/remote.py

@@ -652,8 +652,8 @@ This problem will go away as soon as the server has been upgraded to 1.0.7+.
             # in any case, we want to cleanly close the repo, even if the
             # rollback can not succeed (e.g. because the connection was
             # already closed) and raised another exception:
-            logger.debug('RemoteRepository: %d bytes sent, %d bytes received, %d messages sent',
-                         self.tx_bytes, self.rx_bytes, self.msgid)
+            logger.debug('RemoteRepository: %s bytes sent, %s bytes received, %d messages sent',
+                         format_file_size(self.tx_bytes), format_file_size(self.rx_bytes), self.msgid)
             self.close()
 
     @property

+ 1 - 1
src/borg/testsuite/archiver.py

@@ -2944,7 +2944,7 @@ class ArchiverCorruptionTestCase(ArchiverTestCaseBase):
         chunks_archive = os.path.join(self.cache_path, 'chunks.archive.d')
         assert len(os.listdir(chunks_archive)) == 4  # two archives, one chunks cache and one .integrity file each
 
-        self.corrupt(os.path.join(chunks_archive, target_id))
+        self.corrupt(os.path.join(chunks_archive, target_id + '.compact'))
 
         # Trigger cache sync by changing the manifest ID in the cache config
         config_path = os.path.join(self.cache_path, 'config')

+ 146 - 1
src/borg/testsuite/hashindex.py

@@ -1,10 +1,11 @@
 import base64
 import hashlib
+import io
 import os
 import tempfile
 import zlib
 
-from ..hashindex import NSIndex, ChunkIndex
+from ..hashindex import NSIndex, ChunkIndex, ChunkIndexEntry
 from .. import hashindex
 from ..crypto.file_integrity import IntegrityCheckedFile, FileIntegrityError
 from . import BaseTestCase
@@ -18,6 +19,11 @@ def H(x):
     return bytes('%-0.32d' % x, 'ascii')
 
 
+def H2(x):
+    # like H(x), but with pseudo-random distribution of the output value
+    return hashlib.sha256(H(x)).digest()
+
+
 class HashIndexTestCase(BaseTestCase):
 
     def _generic_test(self, cls, make_value, sha):
@@ -341,6 +347,145 @@ class HashIndexIntegrityTestCase(HashIndexDataTestCase):
                     ChunkIndex.read(fd)
 
 
+class HashIndexCompactTestCase(HashIndexDataTestCase):
+    def index(self, num_entries, num_buckets):
+        index_data = io.BytesIO()
+        index_data.write(b'BORG_IDX')
+        # num_entries
+        index_data.write(num_entries.to_bytes(4, 'little'))
+        # num_buckets
+        index_data.write(num_buckets.to_bytes(4, 'little'))
+        # key_size
+        index_data.write((32).to_bytes(1, 'little'))
+        # value_size
+        index_data.write((3 * 4).to_bytes(1, 'little'))
+
+        self.index_data = index_data
+
+    def index_from_data(self):
+        self.index_data.seek(0)
+        index = ChunkIndex.read(self.index_data)
+        return index
+
+    def index_to_data(self, index):
+        data = io.BytesIO()
+        index.write(data)
+        return data.getvalue()
+
+    def index_from_data_compact_to_data(self):
+        index = self.index_from_data()
+        index.compact()
+        compact_index = self.index_to_data(index)
+        return compact_index
+
+    def write_entry(self, key, *values):
+        self.index_data.write(key)
+        for value in values:
+            self.index_data.write(value.to_bytes(4, 'little'))
+
+    def write_empty(self, key):
+        self.write_entry(key, 0xffffffff, 0, 0)
+
+    def write_deleted(self, key):
+        self.write_entry(key, 0xfffffffe, 0, 0)
+
+    def test_simple(self):
+        self.index(num_entries=3, num_buckets=6)
+        self.write_entry(H2(0), 1, 2, 3)
+        self.write_deleted(H2(1))
+        self.write_empty(H2(2))
+        self.write_entry(H2(3), 5, 6, 7)
+        self.write_entry(H2(4), 8, 9, 10)
+        self.write_empty(H2(5))
+
+        compact_index = self.index_from_data_compact_to_data()
+
+        self.index(num_entries=3, num_buckets=3)
+        self.write_entry(H2(0), 1, 2, 3)
+        self.write_entry(H2(3), 5, 6, 7)
+        self.write_entry(H2(4), 8, 9, 10)
+        assert compact_index == self.index_data.getvalue()
+
+    def test_first_empty(self):
+        self.index(num_entries=3, num_buckets=6)
+        self.write_deleted(H2(1))
+        self.write_entry(H2(0), 1, 2, 3)
+        self.write_empty(H2(2))
+        self.write_entry(H2(3), 5, 6, 7)
+        self.write_entry(H2(4), 8, 9, 10)
+        self.write_empty(H2(5))
+
+        compact_index = self.index_from_data_compact_to_data()
+
+        self.index(num_entries=3, num_buckets=3)
+        self.write_entry(H2(0), 1, 2, 3)
+        self.write_entry(H2(3), 5, 6, 7)
+        self.write_entry(H2(4), 8, 9, 10)
+        assert compact_index == self.index_data.getvalue()
+
+    def test_last_used(self):
+        self.index(num_entries=3, num_buckets=6)
+        self.write_deleted(H2(1))
+        self.write_entry(H2(0), 1, 2, 3)
+        self.write_empty(H2(2))
+        self.write_entry(H2(3), 5, 6, 7)
+        self.write_empty(H2(5))
+        self.write_entry(H2(4), 8, 9, 10)
+
+        compact_index = self.index_from_data_compact_to_data()
+
+        self.index(num_entries=3, num_buckets=3)
+        self.write_entry(H2(0), 1, 2, 3)
+        self.write_entry(H2(3), 5, 6, 7)
+        self.write_entry(H2(4), 8, 9, 10)
+        assert compact_index == self.index_data.getvalue()
+
+    def test_too_few_empty_slots(self):
+        self.index(num_entries=3, num_buckets=6)
+        self.write_deleted(H2(1))
+        self.write_entry(H2(0), 1, 2, 3)
+        self.write_entry(H2(3), 5, 6, 7)
+        self.write_empty(H2(2))
+        self.write_empty(H2(5))
+        self.write_entry(H2(4), 8, 9, 10)
+
+        compact_index = self.index_from_data_compact_to_data()
+
+        self.index(num_entries=3, num_buckets=3)
+        self.write_entry(H2(0), 1, 2, 3)
+        self.write_entry(H2(3), 5, 6, 7)
+        self.write_entry(H2(4), 8, 9, 10)
+        assert compact_index == self.index_data.getvalue()
+
+    def test_empty(self):
+        self.index(num_entries=0, num_buckets=6)
+        self.write_deleted(H2(1))
+        self.write_empty(H2(0))
+        self.write_deleted(H2(3))
+        self.write_empty(H2(2))
+        self.write_empty(H2(5))
+        self.write_deleted(H2(4))
+
+        compact_index = self.index_from_data_compact_to_data()
+
+        self.index(num_entries=0, num_buckets=0)
+        assert compact_index == self.index_data.getvalue()
+
+    def test_merge(self):
+        master = ChunkIndex()
+        idx1 = ChunkIndex()
+        idx1[H(1)] = 1, 100, 100
+        idx1[H(2)] = 2, 200, 200
+        idx1[H(3)] = 3, 300, 300
+        idx1.compact()
+        assert idx1.size() == 18 + 3 * (32 + 3 * 4)
+
+        master.merge(idx1)
+        assert master[H(1)] == (1, 100, 100)
+        assert master[H(2)] == (2, 200, 200)
+        assert master[H(3)] == (3, 300, 300)
+
+
 class NSIndexTestCase(BaseTestCase):
     def test_nsindex_segment_limit(self):
         idx = NSIndex()