浏览代码

cache: compact hashindex before writing to chunks.archive.d

Marian Beermann 8 年之前
父节点
当前提交
6e011b9354
共有 6 个文件被更改,包括 131 次插入34 次删除
  1. 67 9
      src/borg/_hashindex.c
  2. 33 14
      src/borg/cache.py
  3. 11 7
      src/borg/hashindex.pyx
  4. 1 1
      src/borg/helpers.py
  5. 2 2
      src/borg/remote.py
  6. 17 1
      src/borg/testsuite/hashindex.py

+ 67 - 9
src/borg/_hashindex.c

@@ -109,10 +109,11 @@ static int hash_sizes[] = {
 #define EPRINTF_PATH(path, msg, ...) fprintf(stderr, "hashindex: %s: " msg " (%s)\n", path, ##__VA_ARGS__, strerror(errno))
 
 #ifdef Py_PYTHON_H
-static HashIndex *hashindex_read(PyObject *file_py);
+static HashIndex *hashindex_read(PyObject *file_py, int permit_compact);
 static void hashindex_write(HashIndex *index, PyObject *file_py);
 #endif
 
+static uint64_t hashindex_compact(HashIndex *index);
 static HashIndex *hashindex_init(int capacity, int key_size, int value_size);
 static const void *hashindex_get(HashIndex *index, const void *key);
 static int hashindex_set(HashIndex *index, const void *key, const void *value);
@@ -273,7 +274,7 @@ count_empty(HashIndex *index)
 
 #ifdef Py_PYTHON_H
 static HashIndex *
-hashindex_read(PyObject *file_py)
+hashindex_read(PyObject *file_py, int permit_compact)
 {
     Py_ssize_t length, buckets_length, bytes_read;
     Py_buffer header_buffer;
@@ -393,14 +394,16 @@ hashindex_read(PyObject *file_py)
     }
     index->buckets = index->buckets_buffer.buf;
 
-    index->min_empty = get_min_empty(index->num_buckets);
-    index->num_empty = count_empty(index);
+    if(!permit_compact) {
+        index->min_empty = get_min_empty(index->num_buckets);
+        index->num_empty = count_empty(index);
 
-    if(index->num_empty < index->min_empty) {
-        /* too many tombstones here / not enough empty buckets, do a same-size rebuild */
-        if(!hashindex_resize(index, index->num_buckets)) {
-            PyErr_Format(PyExc_ValueError, "Failed to rebuild table");
-            goto fail_free_buckets;
+        if(index->num_empty < index->min_empty) {
+            /* too many tombstones here / not enough empty buckets, do a same-size rebuild */
+            if(!hashindex_resize(index, index->num_buckets)) {
+                PyErr_Format(PyExc_ValueError, "Failed to rebuild table");
+                goto fail_free_buckets;
+            }
         }
     }
 
@@ -620,6 +623,61 @@ hashindex_next_key(HashIndex *index, const void *key)
     return BUCKET_ADDR(index, idx);
 }
 
+static uint64_t
+hashindex_compact(HashIndex *index)
+{
+    int idx = 0;
+    int start_idx;
+    int begin_used_idx;
+    int empty_slot_count, count, buckets_to_copy;
+    int compact_tail_idx = 0;
+    uint64_t saved_size = (index->num_buckets - index->num_entries) * (uint64_t)index->bucket_size;
+
+    if(index->num_buckets - index->num_entries == 0) {
+        /* already compact */
+        return 0;
+    }
+
+    while(idx < index->num_buckets) {
+        /* Phase 1: Find some empty slots */
+        start_idx = idx;
+        while((BUCKET_IS_EMPTY(index, idx) || BUCKET_IS_DELETED(index, idx)) && idx < index->num_buckets) {
+            idx++;
+        }
+
+        /* everything from start_idx to idx is empty or deleted */
+        count = empty_slot_count = idx - start_idx;
+        begin_used_idx = idx;
+
+        if(!empty_slot_count) {
+            memcpy(BUCKET_ADDR(index, compact_tail_idx), BUCKET_ADDR(index, idx), index->bucket_size);
+            idx++;
+            compact_tail_idx++;
+            continue;
+        }
+
+        /* Phase 2: Find some non-empty/non-deleted slots we can move to the compact tail */
+
+        while(!(BUCKET_IS_EMPTY(index, idx) || BUCKET_IS_DELETED(index, idx)) && empty_slot_count && idx < index->num_buckets) {
+            idx++;
+            empty_slot_count--;
+        }
+
+        buckets_to_copy = count - empty_slot_count;
+
+        if(!buckets_to_copy) {
+            /* Nothing to move, reached end of the buckets array with no used buckets. */
+            break;
+        }
+
+        memcpy(BUCKET_ADDR(index, compact_tail_idx), BUCKET_ADDR(index, begin_used_idx), buckets_to_copy * index->bucket_size);
+        compact_tail_idx += buckets_to_copy;
+    }
+
+    index->num_buckets = index->num_entries;
+    return saved_size;
+}
+
 static int
 hashindex_len(HashIndex *index)
 {

+ 33 - 14
src/borg/cache.py

@@ -536,6 +536,10 @@ Chunk index:    {0.total_unique_chunks:20d} {0.total_chunks:20d}"""
         archive indexes.
         """
         archive_path = os.path.join(self.path, 'chunks.archive.d')
+        # Instrumentation
+        processed_item_metadata_bytes = 0
+        processed_item_metadata_chunks = 0
+        compact_chunks_archive_saved_space = 0
 
         def mkpath(id, suffix=''):
             id_hex = bin_to_hex(id)
@@ -545,8 +549,10 @@ Chunk index:    {0.total_unique_chunks:20d} {0.total_chunks:20d}"""
         def cached_archives():
             if self.do_cache:
                 fns = os.listdir(archive_path)
-                # filenames with 64 hex digits == 256bit
-                return set(unhexlify(fn) for fn in fns if len(fn) == 64)
+                # filenames with 64 hex digits == 256bit,
+                # or compact indices which are 64 hex digits + ".compact"
+                return set(unhexlify(fn) for fn in fns if len(fn) == 64) | \
+                       set(unhexlify(fn[:64]) for fn in fns if len(fn) == 72 and fn.endswith('.compact'))
             else:
                 return set()
 
@@ -558,13 +564,21 @@ Chunk index:    {0.total_unique_chunks:20d} {0.total_chunks:20d}"""
                 cleanup_cached_archive(id)
 
         def cleanup_cached_archive(id):
-            os.unlink(mkpath(id))
             try:
+                os.unlink(mkpath(id))
                 os.unlink(mkpath(id) + '.integrity')
             except FileNotFoundError:
                 pass
+            try:
+                os.unlink(mkpath(id, suffix='.compact'))
+                os.unlink(mkpath(id, suffix='.compact') + '.integrity')
+            except FileNotFoundError:
+                pass
 
         def fetch_and_build_idx(archive_id, decrypted_repository, chunk_idx):
+            nonlocal processed_item_metadata_bytes
+            nonlocal processed_item_metadata_chunks
+            nonlocal compact_chunks_archive_saved_space
             csize, data = decrypted_repository.get(archive_id)
             chunk_idx.add(archive_id, 1, len(data), csize)
             archive = ArchiveItem(internal_dict=msgpack.unpackb(data))
@@ -573,9 +587,12 @@ Chunk index:    {0.total_unique_chunks:20d} {0.total_chunks:20d}"""
             sync = CacheSynchronizer(chunk_idx)
             for item_id, (csize, data) in zip(archive.items, decrypted_repository.get_many(archive.items)):
                 chunk_idx.add(item_id, 1, len(data), csize)
+                processed_item_metadata_bytes += len(data)
+                processed_item_metadata_chunks += 1
                 sync.feed(data)
             if self.do_cache:
-                fn = mkpath(archive_id)
+                compact_chunks_archive_saved_space += chunk_idx.compact()
+                fn = mkpath(archive_id, suffix='.compact')
                 fn_tmp = mkpath(archive_id, suffix='.tmp')
                 try:
                     with DetachedIntegrityCheckedFile(path=fn_tmp, write=True,
@@ -612,7 +629,7 @@ Chunk index:    {0.total_unique_chunks:20d} {0.total_chunks:20d}"""
             # due to hash table "resonance".
             master_index_capacity = int(len(self.repository) / ChunkIndex.MAX_LOAD_FACTOR)
             if archive_ids:
-                chunk_idx = None
+                chunk_idx = None if not self.do_cache else ChunkIndex(master_index_capacity)
                 pi = ProgressIndicatorPercent(total=len(archive_ids), step=0.1,
                                               msg='%3.0f%% Syncing chunks cache. Processing archive %s',
                                               msgid='cache.sync')
@@ -624,8 +641,12 @@ Chunk index:    {0.total_unique_chunks:20d} {0.total_chunks:20d}"""
                             archive_chunk_idx_path = mkpath(archive_id)
                             logger.info("Reading cached archive chunk index for %s ...", archive_name)
                             try:
-                                with DetachedIntegrityCheckedFile(path=archive_chunk_idx_path, write=False) as fd:
-                                    archive_chunk_idx = ChunkIndex.read(fd)
+                                try:
+                                    with DetachedIntegrityCheckedFile(path=archive_chunk_idx_path + '.compact', write=False) as fd:
+                                        archive_chunk_idx = ChunkIndex.read(fd, permit_compact=True)
+                                except FileNotFoundError:
+                                    with DetachedIntegrityCheckedFile(path=archive_chunk_idx_path, write=False) as fd:
+                                        archive_chunk_idx = ChunkIndex.read(fd)
                             except FileIntegrityError as fie:
                                 logger.error('Cached archive chunk index of %s is corrupted: %s', archive_name, fie)
                                 # Delete it and fetch a new index
@@ -639,18 +660,16 @@ Chunk index:    {0.total_unique_chunks:20d} {0.total_chunks:20d}"""
                             archive_chunk_idx = ChunkIndex()
                             fetch_and_build_idx(archive_id, decrypted_repository, archive_chunk_idx)
                         logger.info("Merging into master chunks index ...")
-                        if chunk_idx is None:
-                            # we just use the first archive's idx as starting point,
-                            # to avoid growing the hash table from 0 size and also
-                            # to save 1 merge call.
-                            chunk_idx = archive_chunk_idx
-                        else:
-                            chunk_idx.merge(archive_chunk_idx)
+                        chunk_idx.merge(archive_chunk_idx)
                     else:
                         chunk_idx = chunk_idx or ChunkIndex(master_index_capacity)
                         logger.info('Fetching archive index for %s ...', archive_name)
                         fetch_and_build_idx(archive_id, decrypted_repository, chunk_idx)
                 pi.finish()
+                logger.debug('Cache sync: processed %s bytes (%d chunks) of metadata',
+                             format_file_size(processed_item_metadata_bytes), processed_item_metadata_chunks)
+                logger.debug('Cache sync: compact chunks.archive.d storage saved %s bytes',
+                             format_file_size(compact_chunks_archive_saved_space))
             logger.info('Done.')
             return chunk_idx
 

+ 11 - 7
src/borg/hashindex.pyx

@@ -8,14 +8,14 @@ from libc.stdint cimport uint32_t, UINT32_MAX, uint64_t
 from libc.errno cimport errno
 from cpython.exc cimport PyErr_SetFromErrnoWithFilename
 
-API_VERSION = '1.1_02'
+API_VERSION = '1.1_03'
 
 
 cdef extern from "_hashindex.c":
     ctypedef struct HashIndex:
         pass
 
-    HashIndex *hashindex_read(object file_py) except *
+    HashIndex *hashindex_read(object file_py, int permit_compact) except *
     HashIndex *hashindex_init(int capacity, int key_size, int value_size)
     void hashindex_free(HashIndex *index)
     int hashindex_len(HashIndex *index)
@@ -25,6 +25,7 @@ cdef extern from "_hashindex.c":
     void *hashindex_next_key(HashIndex *index, void *key)
     int hashindex_delete(HashIndex *index, void *key)
     int hashindex_set(HashIndex *index, void *key, void *value)
+    uint64_t hashindex_compact(HashIndex *index)
     uint32_t _htole32(uint32_t v)
     uint32_t _le32toh(uint32_t v)
 
@@ -73,14 +74,14 @@ cdef class IndexBase:
     MAX_LOAD_FACTOR = HASH_MAX_LOAD
     MAX_VALUE = _MAX_VALUE
 
-    def __cinit__(self, capacity=0, path=None, key_size=32):
+    def __cinit__(self, capacity=0, path=None, key_size=32, permit_compact=False):
         self.key_size = key_size
         if path:
             if isinstance(path, (str, bytes)):
                 with open(path, 'rb') as fd:
-                    self.index = hashindex_read(fd)
+                    self.index = hashindex_read(fd, permit_compact)
             else:
-                self.index = hashindex_read(path)
+                self.index = hashindex_read(path, permit_compact)
             assert self.index, 'hashindex_read() returned NULL with no exception set'
         else:
             self.index = hashindex_init(capacity, self.key_size, self.value_size)
@@ -92,8 +93,8 @@ cdef class IndexBase:
             hashindex_free(self.index)
 
     @classmethod
-    def read(cls, path):
-        return cls(path=path)
+    def read(cls, path, permit_compact=False):
+        return cls(path=path, permit_compact=permit_compact)
 
     def write(self, path):
         if isinstance(path, (str, bytes)):
@@ -140,6 +141,9 @@ cdef class IndexBase:
         """Return size (bytes) of hash table."""
         return hashindex_size(self.index)
 
+    def compact(self):
+        return hashindex_compact(self.index)
+
 
 cdef class NSIndex(IndexBase):
 

+ 1 - 1
src/borg/helpers.py

@@ -126,7 +126,7 @@ def check_python():
 
 def check_extension_modules():
     from . import platform, compress, item
-    if hashindex.API_VERSION != '1.1_02':
+    if hashindex.API_VERSION != '1.1_03':
         raise ExtensionModuleError
     if chunker.API_VERSION != '1.1_01':
         raise ExtensionModuleError

+ 2 - 2
src/borg/remote.py

@@ -642,8 +642,8 @@ This problem will go away as soon as the server has been upgraded to 1.0.7+.
             # in any case, we want to cleanly close the repo, even if the
             # rollback can not succeed (e.g. because the connection was
             # already closed) and raised another exception:
-            logger.debug('RemoteRepository: %d bytes sent, %d bytes received, %d messages sent',
-                         self.tx_bytes, self.rx_bytes, self.msgid)
+            logger.debug('RemoteRepository: %s bytes sent, %s bytes received, %d messages sent',
+                         format_file_size(self.tx_bytes), format_file_size(self.rx_bytes), self.msgid)
             self.close()
 
     @property

+ 17 - 1
src/borg/testsuite/hashindex.py

@@ -4,7 +4,7 @@ import os
 import tempfile
 import zlib
 
-from ..hashindex import NSIndex, ChunkIndex
+from ..hashindex import NSIndex, ChunkIndex, ChunkIndexEntry
 from .. import hashindex
 from ..crypto.file_integrity import IntegrityCheckedFile, FileIntegrityError
 from . import BaseTestCase
@@ -156,6 +156,22 @@ class HashIndexExtraTestCase(BaseTestCase):
         # the index should now be empty
         assert list(index.iteritems()) == []
 
+    def test_vacuum(self):
+        idx1 = ChunkIndex()
+        idx1[H(1)] = 1, 100, 100
+        idx1[H(2)] = 2, 200, 200
+        idx1[H(3)] = 3, 300, 300
+        idx1.compact()
+        assert idx1.size() == 18 + 3 * (32 + 3 * 4)
+        #with self.assert_raises(KeyError):
+        #    idx1[H(1)]
+        data = list(idx1.iteritems())
+        print(data)
+        assert (H(1), ChunkIndexEntry(1, 100, 100)) in data
+        assert (H(2), ChunkIndexEntry(2, 200, 200)) in data
+        assert (H(3), ChunkIndexEntry(3, 300, 300)) in data
+
+
 
 class HashIndexSizeTestCase(BaseTestCase):
     def test_size_on_disk(self):