浏览代码

CacheSynchronizer

Marian Beermann 8 年之前
父节点
当前提交
740898d83b
共有 5 个文件被更改,包括 198 次插入15 次删除
  1. 1 1
      setup.py
  2. 159 0
      src/borg/_cache.c
  3. 3 9
      src/borg/cache.py
  4. 34 4
      src/borg/hashindex.pyx
  5. 1 1
      src/borg/helpers.py

+ 1 - 1
setup.py

@@ -600,7 +600,7 @@ if not on_rtd:
     ext_modules += [
     Extension('borg.compress', [compress_source], libraries=['lz4'], include_dirs=include_dirs, library_dirs=library_dirs, define_macros=define_macros),
     Extension('borg.crypto.low_level', [crypto_ll_source], libraries=crypto_libraries, include_dirs=include_dirs, library_dirs=library_dirs, define_macros=define_macros),
-    Extension('borg.hashindex', [hashindex_source]),
+    Extension('borg.hashindex', [hashindex_source], libraries=['msgpackc']),
     Extension('borg.item', [item_source]),
     Extension('borg.algorithms.chunker', [chunker_source]),
     Extension('borg.algorithms.checksums', [checksums_source]),

+ 159 - 0
src/borg/_cache.c

@@ -0,0 +1,159 @@
+
+#include <msgpack.h>
+
+// 2**32 - 1025
+#define _MAX_VALUE ( (uint32_t) 4294966271 )
+
+#define MIN(x, y) ((x) < (y) ? (x): (y))
+
+typedef struct {
+    HashIndex *chunks;
+
+    msgpack_unpacker unpacker;
+    msgpack_unpacked unpacked;
+    const char *error;
+} CacheSyncCtx;
+
+static CacheSyncCtx *
+cache_sync_init(HashIndex *chunks)
+{
+    CacheSyncCtx *ctx;
+    if (!(ctx = malloc(sizeof(CacheSyncCtx)))) {
+        return NULL;
+    }
+
+    ctx->chunks = chunks;
+    ctx->error = NULL;
+
+    if(!msgpack_unpacker_init(&ctx->unpacker, MSGPACK_UNPACKER_INIT_BUFFER_SIZE)) {
+        free(ctx);
+        return NULL;
+    }
+
+    msgpack_unpacked_init(&ctx->unpacked);
+
+    return ctx;
+}
+
+static void
+cache_sync_free(CacheSyncCtx *ctx)
+{
+    msgpack_unpacker_destroy(&ctx->unpacker);
+    msgpack_unpacked_destroy(&ctx->unpacked);
+    free(ctx);
+}
+
+static const char *
+cache_sync_error(CacheSyncCtx *ctx)
+{
+    return ctx->error;
+}
+
+static int
+cache_process_chunks(CacheSyncCtx *ctx, msgpack_object_array *array)
+{
+    uint32_t i;
+    const char *key;
+    uint32_t cache_values[3];
+    uint32_t *cache_entry;
+    uint64_t refcount;
+    msgpack_object *current;
+    for (i = 0; i < array->size; i++) {
+        current = &array->ptr[i];
+
+        if (current->type != MSGPACK_OBJECT_ARRAY || current->via.array.size != 3
+            || current->via.array.ptr[0].type != MSGPACK_OBJECT_STR || current->via.array.ptr[0].via.str.size != 32
+            || current->via.array.ptr[1].type != MSGPACK_OBJECT_POSITIVE_INTEGER
+            || current->via.array.ptr[2].type != MSGPACK_OBJECT_POSITIVE_INTEGER) {
+            ctx->error = "Malformed chunk list entry";
+            return 0;
+        }
+
+        key = current->via.array.ptr[0].via.str.ptr;
+        cache_entry = (uint32_t*) hashindex_get(ctx->chunks, key);
+        if (cache_entry) {
+            refcount = _le32toh(cache_entry[0]);
+            refcount += 1;
+            cache_entry[0] = _htole32(MIN(refcount, _MAX_VALUE));
+        } else {
+            /* refcount, size, csize */
+            cache_values[0] = 1;
+            cache_values[1] = current->via.array.ptr[1].via.u64;
+            cache_values[2] = current->via.array.ptr[2].via.u64;
+            if (!hashindex_set(ctx->chunks, key, cache_values)) {
+                ctx->error = "hashindex_set failed";
+                return 0;
+            }
+        }
+    }
+    return 1;
+}
+
+/**
+ * feed data to the cache synchronizer
+ * 0 = abort, 1 = continue
+ * abort is a regular condition, check cache_sync_error
+ */
+static int
+cache_sync_feed(CacheSyncCtx *ctx, void *data, uint32_t length)
+{
+    msgpack_unpack_return unpack_status;
+
+    /* grow buffer if necessary */
+    if (msgpack_unpacker_buffer_capacity(&ctx->unpacker) < length) {
+        if (!msgpack_unpacker_reserve_buffer(&ctx->unpacker, length)) {
+            return 0;
+        }
+    }
+
+    memcpy(msgpack_unpacker_buffer(&ctx->unpacker), data, length);
+    msgpack_unpacker_buffer_consumed(&ctx->unpacker, length);
+
+    do {
+        unpack_status = msgpack_unpacker_next(&ctx->unpacker, &ctx->unpacked);
+
+        switch (unpack_status) {
+        case MSGPACK_UNPACK_SUCCESS:
+            {
+                uint32_t i;
+                msgpack_object *item = &ctx->unpacked.data;
+                msgpack_object_kv *current;
+
+                if (item->type != MSGPACK_OBJECT_MAP) {
+                    ctx->error = "Unexpected data type in item stream";
+                    return 0;
+                }
+
+                for (i = 0; i < item->via.map.size; i++) {
+                    current = &item->via.map.ptr[i];
+
+                    if (current->key.type != MSGPACK_OBJECT_STR) {
+                        ctx->error = "Invalid key data type in item";
+                        return 0;
+                    }
+
+                    if (current->key.via.str.size == 6
+                        && !memcmp(current->key.via.str.ptr, "chunks", 6)) {
+
+                        if (current->val.type != MSGPACK_OBJECT_ARRAY) {
+                            ctx->error = "Unexpected value type of item chunks";
+                            return 0;
+                        }
+
+                        if (!cache_process_chunks(ctx, &current->val.via.array)) {
+                            return 0;
+                        }
+                    }
+                }
+            }
+            break;
+        case MSGPACK_UNPACK_PARSE_ERROR:
+            ctx->error = "Malformed msgpack";
+            return 0;
+        default:
+            break;
+        }
+    } while (unpack_status != MSGPACK_UNPACK_CONTINUE);
+
+    return 1;
+}

+ 3 - 9
src/borg/cache.py

@@ -12,7 +12,7 @@ from .logger import create_logger
 logger = create_logger()
 
 from .constants import CACHE_README
-from .hashindex import ChunkIndex, ChunkIndexEntry
+from .hashindex import ChunkIndex, ChunkIndexEntry, CacheSynchronizer
 from .helpers import Location
 from .helpers import Error
 from .helpers import get_cache_dir, get_security_dir
@@ -571,17 +571,11 @@ Chunk index:    {0.total_unique_chunks:20d} {0.total_chunks:20d}"""
             archive = ArchiveItem(internal_dict=msgpack.unpackb(data))
             if archive.version != 1:
                 raise Exception('Unknown archive metadata version')
-            unpacker = msgpack.Unpacker()
+            sync = CacheSynchronizer(chunk_idx)
             for item_id, chunk in zip(archive.items, repository.get_many(archive.items)):
                 data = key.decrypt(item_id, chunk)
                 chunk_idx.add(item_id, 1, len(data), len(chunk))
-                unpacker.feed(data)
-                for item in unpacker:
-                    if not isinstance(item, dict):
-                        logger.error('Error: Did not get expected metadata dict - archive corrupted!')
-                        continue   # XXX: continue?!
-                    for chunk_id, size, csize in item.get(b'chunks', []):
-                        chunk_idx.add(chunk_id, 1, size, csize)
+                sync.feed(data)
             if self.do_cache:
                 fn = mkpath(archive_id)
                 fn_tmp = mkpath(archive_id, suffix='.tmp')

+ 34 - 4
src/borg/hashindex.pyx

@@ -8,7 +8,7 @@ from libc.stdint cimport uint32_t, UINT32_MAX, uint64_t
 from libc.errno cimport errno
 from cpython.exc cimport PyErr_SetFromErrnoWithFilename
 
-API_VERSION = '1.1_01'
+API_VERSION = '1.1_02'
 
 
 cdef extern from "_hashindex.c":
@@ -31,6 +31,18 @@ cdef extern from "_hashindex.c":
     double HASH_MAX_LOAD
 
 
+cdef extern from "_cache.c":
+    ctypedef struct CacheSyncCtx:
+        pass
+
+    CacheSyncCtx *cache_sync_init(HashIndex *chunks)
+    const char *cache_sync_error(CacheSyncCtx *ctx)
+    int cache_sync_feed(CacheSyncCtx *ctx, void *data, uint32_t length)
+    void cache_sync_free(CacheSyncCtx *ctx)
+
+    uint32_t _MAX_VALUE
+
+
 cdef _NoDefault = object()
 
 """
@@ -50,9 +62,6 @@ AssertionError is raised instead.
 
 assert UINT32_MAX == 2**32-1
 
-# module-level constant because cdef's in classes can't have default values
-cdef uint32_t _MAX_VALUE = 2**32-1025
-
 assert _MAX_VALUE % 2 == 1
 
 
@@ -375,3 +384,24 @@ cdef class ChunkKeyIterator:
         cdef uint32_t refcount = _le32toh(value[0])
         assert refcount <= _MAX_VALUE, "invalid reference count"
         return (<char *>self.key)[:self.key_size], ChunkIndexEntry(refcount, _le32toh(value[1]), _le32toh(value[2]))
+
+
+cdef class CacheSynchronizer:
+    cdef ChunkIndex chunks
+    cdef CacheSyncCtx *sync
+
+    def __cinit__(self, chunks):
+        self.chunks = chunks
+        self.sync = cache_sync_init(self.chunks.index)
+        if not self.sync:
+            raise Exception('cache_sync_init failed')
+
+    def __dealloc__(self):
+        if self.sync:
+            cache_sync_free(self.sync)
+
+    def feed(self, chunk):
+        if not cache_sync_feed(self.sync, <char *>chunk, len(chunk)):
+            error = cache_sync_error(self.sync)
+            if error is not None:
+                raise Exception('cache_sync_feed failed: ' + error.decode('ascii'))

+ 1 - 1
src/borg/helpers.py

@@ -126,7 +126,7 @@ def check_python():
 
 def check_extension_modules():
     from . import platform, compress, item
-    if hashindex.API_VERSION != '1.1_01':
+    if hashindex.API_VERSION != '1.1_02':
         raise ExtensionModuleError
     if chunker.API_VERSION != '1.1_01':
         raise ExtensionModuleError