فهرست منبع

Merge pull request #2572 from enkore/f/fastcachesync

Improve cache sync speed
enkore 8 سال پیش
والد
کامیت
d06ee5648c

+ 10 - 0
scripts/fuzz-cache-sync/HOWTO

@@ -0,0 +1,10 @@
+- Install AFL and the requirements for LLVM mode (see docs)
+- Compile the fuzzing target, e.g.
+
+  AFL_HARDEN=1 afl-clang-fast main.c -o fuzz-target -O3
+
+  (other options, like using ASan or MSan are possible as well)
+- Add additional test cases to testcase_dir
+- Run afl, easiest (but inefficient) way;
+
+  afl-fuzz -i testcase_dir -o findings_dir ./fuzz-target

+ 30 - 0
scripts/fuzz-cache-sync/main.c

@@ -0,0 +1,30 @@
+#include "../../src/borg/_hashindex.c"
+#include "../../src/borg/cache_sync/cache_sync.c"
+
+#define BUFSZ 32768
+
+int main() {
+    char buf[BUFSZ];
+    int len, ret;
+    CacheSyncCtx *ctx;
+    HashIndex *idx;
+
+    /* capacity, key size, value size */
+    idx = hashindex_init(0, 32, 12);
+    ctx = cache_sync_init(idx);
+
+    while (1) {
+        len = read(0, buf, BUFSZ);
+        if (!len) {
+            break;
+        }
+        ret = cache_sync_feed(ctx, buf, len);
+        if(!ret && cache_sync_error(ctx)) {
+            fprintf(stderr, "error: %s\n", cache_sync_error(ctx));
+            return 1;
+        }
+    }
+    hashindex_free(idx);
+    cache_sync_free(ctx);
+    return 0;
+}

BIN
scripts/fuzz-cache-sync/testcase_dir/test_simple


+ 2 - 0
setup.py

@@ -91,6 +91,8 @@ try:
                 'src/borg/crypto/low_level.c',
                 'src/borg/crypto/low_level.c',
                 'src/borg/chunker.c', 'src/borg/_chunker.c',
                 'src/borg/chunker.c', 'src/borg/_chunker.c',
                 'src/borg/hashindex.c', 'src/borg/_hashindex.c',
                 'src/borg/hashindex.c', 'src/borg/_hashindex.c',
+                'src/borg/cache_sync/cache_sync.c', 'src/borg/cache_sync/sysdep.h', 'src/borg/cache_sync/unpack.h',
+                'src/borg/cache_sync/unpack_define.h', 'src/borg/cache_sync/unpack_template.h',
                 'src/borg/item.c',
                 'src/borg/item.c',
                 'src/borg/algorithms/checksums.c',
                 'src/borg/algorithms/checksums.c',
                 'src/borg/algorithms/crc32_dispatch.c', 'src/borg/algorithms/crc32_clmul.c', 'src/borg/algorithms/crc32_slice_by_8.c',
                 'src/borg/algorithms/crc32_dispatch.c', 'src/borg/algorithms/crc32_clmul.c', 'src/borg/algorithms/crc32_slice_by_8.c',

+ 16 - 3
src/borg/_hashindex.c

@@ -1,6 +1,6 @@
-#include <Python.h>
 
 
 #include <assert.h>
 #include <assert.h>
+#include <errno.h>
 #include <stdio.h>
 #include <stdio.h>
 #include <stdlib.h>
 #include <stdlib.h>
 #include <stdint.h>
 #include <stdint.h>
@@ -56,8 +56,10 @@ typedef struct {
     int lower_limit;
     int lower_limit;
     int upper_limit;
     int upper_limit;
     int min_empty;
     int min_empty;
+#ifdef Py_PYTHON_H
     /* buckets may be backed by a Python buffer. If buckets_buffer.buf is NULL then this is not used. */
     /* buckets may be backed by a Python buffer. If buckets_buffer.buf is NULL then this is not used. */
     Py_buffer buckets_buffer;
     Py_buffer buckets_buffer;
+#endif
 } HashIndex;
 } HashIndex;
 
 
 /* prime (or w/ big prime factors) hash table sizes
 /* prime (or w/ big prime factors) hash table sizes
@@ -106,8 +108,11 @@ static int hash_sizes[] = {
 #define EPRINTF(msg, ...) fprintf(stderr, "hashindex: " msg "(%s)\n", ##__VA_ARGS__, strerror(errno))
 #define EPRINTF(msg, ...) fprintf(stderr, "hashindex: " msg "(%s)\n", ##__VA_ARGS__, strerror(errno))
 #define EPRINTF_PATH(path, msg, ...) fprintf(stderr, "hashindex: %s: " msg " (%s)\n", path, ##__VA_ARGS__, strerror(errno))
 #define EPRINTF_PATH(path, msg, ...) fprintf(stderr, "hashindex: %s: " msg " (%s)\n", path, ##__VA_ARGS__, strerror(errno))
 
 
+#ifdef Py_PYTHON_H
 static HashIndex *hashindex_read(PyObject *file_py);
 static HashIndex *hashindex_read(PyObject *file_py);
 static void hashindex_write(HashIndex *index, PyObject *file_py);
 static void hashindex_write(HashIndex *index, PyObject *file_py);
+#endif
+
 static HashIndex *hashindex_init(int capacity, int key_size, int value_size);
 static HashIndex *hashindex_init(int capacity, int key_size, int value_size);
 static const void *hashindex_get(HashIndex *index, const void *key);
 static const void *hashindex_get(HashIndex *index, const void *key);
 static int hashindex_set(HashIndex *index, const void *key, const void *value);
 static int hashindex_set(HashIndex *index, const void *key, const void *value);
@@ -120,9 +125,12 @@ static void hashindex_free(HashIndex *index);
 static void
 static void
 hashindex_free_buckets(HashIndex *index)
 hashindex_free_buckets(HashIndex *index)
 {
 {
+#ifdef Py_PYTHON_H
     if(index->buckets_buffer.buf) {
     if(index->buckets_buffer.buf) {
         PyBuffer_Release(&index->buckets_buffer);
         PyBuffer_Release(&index->buckets_buffer);
-    } else {
+    } else
+#endif
+    {
         free(index->buckets);
         free(index->buckets);
     }
     }
 }
 }
@@ -263,6 +271,7 @@ count_empty(HashIndex *index)
 
 
 /* Public API */
 /* Public API */
 
 
+#ifdef Py_PYTHON_H
 static HashIndex *
 static HashIndex *
 hashindex_read(PyObject *file_py)
 hashindex_read(PyObject *file_py)
 {
 {
@@ -418,6 +427,7 @@ fail_decref_header:
 fail:
 fail:
     return index;
     return index;
 }
 }
+#endif
 
 
 static HashIndex *
 static HashIndex *
 hashindex_init(int capacity, int key_size, int value_size)
 hashindex_init(int capacity, int key_size, int value_size)
@@ -444,7 +454,9 @@ hashindex_init(int capacity, int key_size, int value_size)
     index->lower_limit = get_lower_limit(index->num_buckets);
     index->lower_limit = get_lower_limit(index->num_buckets);
     index->upper_limit = get_upper_limit(index->num_buckets);
     index->upper_limit = get_upper_limit(index->num_buckets);
     index->min_empty = get_min_empty(index->num_buckets);
     index->min_empty = get_min_empty(index->num_buckets);
+#ifdef Py_PYTHON_H
     index->buckets_buffer.buf = NULL;
     index->buckets_buffer.buf = NULL;
+#endif
     for(i = 0; i < capacity; i++) {
     for(i = 0; i < capacity; i++) {
         BUCKET_MARK_EMPTY(index, i);
         BUCKET_MARK_EMPTY(index, i);
     }
     }
@@ -458,7 +470,7 @@ hashindex_free(HashIndex *index)
     free(index);
     free(index);
 }
 }
 
 
-
+#ifdef Py_PYTHON_H
 static void
 static void
 hashindex_write(HashIndex *index, PyObject *file_py)
 hashindex_write(HashIndex *index, PyObject *file_py)
 {
 {
@@ -521,6 +533,7 @@ hashindex_write(HashIndex *index, PyObject *file_py)
         return;
         return;
     }
     }
 }
 }
+#endif
 
 
 static const void *
 static const void *
 hashindex_get(HashIndex *index, const void *key)
 hashindex_get(HashIndex *index, const void *key)

+ 32 - 33
src/borg/cache.py

@@ -12,7 +12,7 @@ from .logger import create_logger
 logger = create_logger()
 logger = create_logger()
 
 
 from .constants import CACHE_README
 from .constants import CACHE_README
-from .hashindex import ChunkIndex, ChunkIndexEntry
+from .hashindex import ChunkIndex, ChunkIndexEntry, CacheSynchronizer
 from .helpers import Location
 from .helpers import Location
 from .helpers import Error
 from .helpers import Error
 from .helpers import get_cache_dir, get_security_dir
 from .helpers import get_cache_dir, get_security_dir
@@ -564,24 +564,16 @@ Chunk index:    {0.total_unique_chunks:20d} {0.total_chunks:20d}"""
             except FileNotFoundError:
             except FileNotFoundError:
                 pass
                 pass
 
 
-        def fetch_and_build_idx(archive_id, repository, key, chunk_idx):
-            cdata = repository.get(archive_id)
-            data = key.decrypt(archive_id, cdata)
-            chunk_idx.add(archive_id, 1, len(data), len(cdata))
+        def fetch_and_build_idx(archive_id, decrypted_repository, chunk_idx):
+            csize, data = decrypted_repository.get(archive_id)
+            chunk_idx.add(archive_id, 1, len(data), csize)
             archive = ArchiveItem(internal_dict=msgpack.unpackb(data))
             archive = ArchiveItem(internal_dict=msgpack.unpackb(data))
             if archive.version != 1:
             if archive.version != 1:
                 raise Exception('Unknown archive metadata version')
                 raise Exception('Unknown archive metadata version')
-            unpacker = msgpack.Unpacker()
-            for item_id, chunk in zip(archive.items, repository.get_many(archive.items)):
-                data = key.decrypt(item_id, chunk)
-                chunk_idx.add(item_id, 1, len(data), len(chunk))
-                unpacker.feed(data)
-                for item in unpacker:
-                    if not isinstance(item, dict):
-                        logger.error('Error: Did not get expected metadata dict - archive corrupted!')
-                        continue   # XXX: continue?!
-                    for chunk_id, size, csize in item.get(b'chunks', []):
-                        chunk_idx.add(chunk_id, 1, size, csize)
+            sync = CacheSynchronizer(chunk_idx)
+            for item_id, (csize, data) in zip(archive.items, decrypted_repository.get_many(archive.items)):
+                chunk_idx.add(item_id, 1, len(data), csize)
+                sync.feed(data)
             if self.do_cache:
             if self.do_cache:
                 fn = mkpath(archive_id)
                 fn = mkpath(archive_id)
                 fn_tmp = mkpath(archive_id, suffix='.tmp')
                 fn_tmp = mkpath(archive_id, suffix='.tmp')
@@ -594,10 +586,17 @@ Chunk index:    {0.total_unique_chunks:20d} {0.total_chunks:20d}"""
                 else:
                 else:
                     os.rename(fn_tmp, fn)
                     os.rename(fn_tmp, fn)
 
 
-        def lookup_name(archive_id):
+        def get_archive_ids_to_names(archive_ids):
+            # Pass once over all archives and build a mapping from ids to names.
+            # The easier approach, doing a similar loop for each archive, has
+            # square complexity and does about a dozen million functions calls
+            # with 1100 archives (which takes 30s CPU seconds _alone_).
+            archive_names = {}
             for info in self.manifest.archives.list():
             for info in self.manifest.archives.list():
-                if info.id == archive_id:
-                    return info.name
+                if info.id in archive_ids:
+                    archive_names[info.id] = info.name
+            assert len(archive_names) == len(archive_ids)
+            return archive_names
 
 
         def create_master_idx(chunk_idx):
         def create_master_idx(chunk_idx):
             logger.info('Synchronizing chunks cache...')
             logger.info('Synchronizing chunks cache...')
@@ -609,16 +608,17 @@ Chunk index:    {0.total_unique_chunks:20d} {0.total_chunks:20d}"""
             # deallocates old hashindex, creates empty hashindex:
             # deallocates old hashindex, creates empty hashindex:
             chunk_idx.clear()
             chunk_idx.clear()
             cleanup_outdated(cached_ids - archive_ids)
             cleanup_outdated(cached_ids - archive_ids)
+            # Explicitly set the initial hash table capacity to avoid performance issues
+            # due to hash table "resonance".
+            master_index_capacity = int(len(self.repository) / ChunkIndex.MAX_LOAD_FACTOR)
             if archive_ids:
             if archive_ids:
                 chunk_idx = None
                 chunk_idx = None
-                if self.progress:
-                    pi = ProgressIndicatorPercent(total=len(archive_ids), step=0.1,
-                                                  msg='%3.0f%% Syncing chunks cache. Processing archive %s',
-                                                  msgid='cache.sync')
-                for archive_id in archive_ids:
-                    archive_name = lookup_name(archive_id)
-                    if self.progress:
-                        pi.show(info=[remove_surrogates(archive_name)])
+                pi = ProgressIndicatorPercent(total=len(archive_ids), step=0.1,
+                                              msg='%3.0f%% Syncing chunks cache. Processing archive %s',
+                                              msgid='cache.sync')
+                archive_ids_to_names = get_archive_ids_to_names(archive_ids)
+                for archive_id, archive_name in archive_ids_to_names.items():
+                    pi.show(info=[remove_surrogates(archive_name)])
                     if self.do_cache:
                     if self.do_cache:
                         if archive_id in cached_ids:
                         if archive_id in cached_ids:
                             archive_chunk_idx_path = mkpath(archive_id)
                             archive_chunk_idx_path = mkpath(archive_id)
@@ -637,7 +637,7 @@ Chunk index:    {0.total_unique_chunks:20d} {0.total_chunks:20d}"""
                             # above can remove *archive_id* from *cached_ids*.
                             # above can remove *archive_id* from *cached_ids*.
                             logger.info('Fetching and building archive index for %s ...', archive_name)
                             logger.info('Fetching and building archive index for %s ...', archive_name)
                             archive_chunk_idx = ChunkIndex()
                             archive_chunk_idx = ChunkIndex()
-                            fetch_and_build_idx(archive_id, repository, self.key, archive_chunk_idx)
+                            fetch_and_build_idx(archive_id, decrypted_repository, archive_chunk_idx)
                         logger.info("Merging into master chunks index ...")
                         logger.info("Merging into master chunks index ...")
                         if chunk_idx is None:
                         if chunk_idx is None:
                             # we just use the first archive's idx as starting point,
                             # we just use the first archive's idx as starting point,
@@ -647,11 +647,10 @@ Chunk index:    {0.total_unique_chunks:20d} {0.total_chunks:20d}"""
                         else:
                         else:
                             chunk_idx.merge(archive_chunk_idx)
                             chunk_idx.merge(archive_chunk_idx)
                     else:
                     else:
-                        chunk_idx = chunk_idx or ChunkIndex()
+                        chunk_idx = chunk_idx or ChunkIndex(master_index_capacity)
                         logger.info('Fetching archive index for %s ...', archive_name)
                         logger.info('Fetching archive index for %s ...', archive_name)
-                        fetch_and_build_idx(archive_id, repository, self.key, chunk_idx)
-                if self.progress:
-                    pi.finish()
+                        fetch_and_build_idx(archive_id, decrypted_repository, chunk_idx)
+                pi.finish()
             logger.info('Done.')
             logger.info('Done.')
             return chunk_idx
             return chunk_idx
 
 
@@ -671,7 +670,7 @@ Chunk index:    {0.total_unique_chunks:20d} {0.total_chunks:20d}"""
                 pass
                 pass
 
 
         self.begin_txn()
         self.begin_txn()
-        with cache_if_remote(self.repository) as repository:
+        with cache_if_remote(self.repository, decrypted_cache=self.key) as decrypted_repository:
             legacy_cleanup()
             legacy_cleanup()
             # TEMPORARY HACK: to avoid archive index caching, create a FILE named ~/.cache/borg/REPOID/chunks.archive.d -
             # TEMPORARY HACK: to avoid archive index caching, create a FILE named ~/.cache/borg/REPOID/chunks.archive.d -
             # this is only recommended if you have a fast, low latency connection to your repo (e.g. if repo is local disk)
             # this is only recommended if you have a fast, low latency connection to your repo (e.g. if repo is local disk)

+ 122 - 0
src/borg/cache_sync/cache_sync.c

@@ -0,0 +1,122 @@
+/*
+ * Borg cache synchronizer,
+ * high level interface.
+ *
+ * These routines parse msgpacked item metadata and update a HashIndex
+ * with all chunks that are referenced from the items.
+ *
+ * This file only contains some initialization and buffer management.
+ *
+ * The parser is split in two parts, somewhat similar to lexer/parser combinations:
+ *
+ * unpack_template.h munches msgpack and calls a specific callback for each object
+ * encountered (e.g. beginning of a map, an integer, a string, a map item etc.).
+ *
+ * unpack.h implements these callbacks and uses another state machine to
+ * extract chunk references from it.
+ */
+
+#include "unpack.h"
+
+typedef struct {
+    unpack_context ctx;
+
+    char *buf;
+    size_t head;
+    size_t tail;
+    size_t size;
+} CacheSyncCtx;
+
+static CacheSyncCtx *
+cache_sync_init(HashIndex *chunks)
+{
+    CacheSyncCtx *ctx;
+    if (!(ctx = (CacheSyncCtx*)malloc(sizeof(CacheSyncCtx)))) {
+        return NULL;
+    }
+
+    unpack_init(&ctx->ctx);
+    /* needs to be set only once */
+    ctx->ctx.user.chunks = chunks;
+    ctx->buf = NULL;
+    ctx->head = 0;
+    ctx->tail = 0;
+    ctx->size = 0;
+
+    return ctx;
+}
+
+static void
+cache_sync_free(CacheSyncCtx *ctx)
+{
+    if(ctx->buf) {
+        free(ctx->buf);
+    }
+    free(ctx);
+}
+
+static const char *
+cache_sync_error(CacheSyncCtx *ctx)
+{
+    return ctx->ctx.user.last_error;
+}
+
+/**
+ * feed data to the cache synchronizer
+ * 0 = abort, 1 = continue
+ * abort is a regular condition, check cache_sync_error
+ */
+static int
+cache_sync_feed(CacheSyncCtx *ctx, void *data, uint32_t length)
+{
+    size_t new_size;
+    int ret;
+    char *new_buf;
+
+    if(ctx->tail + length > ctx->size) {
+        if((ctx->tail - ctx->head) + length <= ctx->size) {
+            /* |  XXXXX| -> move data in buffer backwards -> |XXXXX  | */
+            memmove(ctx->buf, ctx->buf + ctx->head, ctx->tail - ctx->head);
+            ctx->tail -= ctx->head;
+            ctx->head = 0;
+        } else {
+            /* must expand buffer to fit all data */
+            new_size = (ctx->tail - ctx->head) + length;
+            new_buf = (char*) malloc(new_size);
+            if(!new_buf) {
+                ctx->ctx.user.last_error = "cache_sync_feed: unable to allocate buffer";
+                return 0;
+            }
+            memcpy(new_buf, ctx->buf + ctx->head, ctx->tail - ctx->head);
+            free(ctx->buf);
+            ctx->buf = new_buf;
+            ctx->tail -= ctx->head;
+            ctx->head = 0;
+            ctx->size = new_size;
+        }
+    }
+
+    memcpy(ctx->buf + ctx->tail, data, length);
+    ctx->tail += length;
+
+    while(1) {
+        if(ctx->head >= ctx->tail) {
+            return 1;  /* request more bytes */
+        }
+
+        ret = unpack_execute(&ctx->ctx, ctx->buf, ctx->tail, &ctx->head);
+        if(ret == 1) {
+            unpack_init(&ctx->ctx);
+            continue;
+        } else if(ret == 0) {
+            return 1;
+        } else {
+            if(!ctx->ctx.user.last_error) {
+                ctx->ctx.user.last_error = "Unknown error";
+            }
+            return 0;
+        }
+    }
+    /* unreachable */
+    return 1;
+}

+ 194 - 0
src/borg/cache_sync/sysdep.h

@@ -0,0 +1,194 @@
+/*
+ * MessagePack system dependencies
+ *
+ * Copyright (C) 2008-2010 FURUHASHI Sadayuki
+ *
+ *    Licensed under the Apache License, Version 2.0 (the "License");
+ *    you may not use this file except in compliance with the License.
+ *    You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *    Unless required by applicable law or agreed to in writing, software
+ *    distributed under the License is distributed on an "AS IS" BASIS,
+ *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *    See the License for the specific language governing permissions and
+ *    limitations under the License.
+ */
+#ifndef MSGPACK_SYSDEP_H__
+#define MSGPACK_SYSDEP_H__
+
+#include <stdlib.h>
+#include <stddef.h>
+#if defined(_MSC_VER) && _MSC_VER < 1600
+typedef __int8 int8_t;
+typedef unsigned __int8 uint8_t;
+typedef __int16 int16_t;
+typedef unsigned __int16 uint16_t;
+typedef __int32 int32_t;
+typedef unsigned __int32 uint32_t;
+typedef __int64 int64_t;
+typedef unsigned __int64 uint64_t;
+#elif defined(_MSC_VER)  // && _MSC_VER >= 1600
+#include <stdint.h>
+#else
+#include <stdint.h>
+#include <stdbool.h>
+#endif
+
+#ifdef _WIN32
+#define _msgpack_atomic_counter_header <windows.h>
+typedef long _msgpack_atomic_counter_t;
+#define _msgpack_sync_decr_and_fetch(ptr) InterlockedDecrement(ptr)
+#define _msgpack_sync_incr_and_fetch(ptr) InterlockedIncrement(ptr)
+#elif defined(__GNUC__) && ((__GNUC__*10 + __GNUC_MINOR__) < 41)
+#define _msgpack_atomic_counter_header "gcc_atomic.h"
+#else
+typedef unsigned int _msgpack_atomic_counter_t;
+#define _msgpack_sync_decr_and_fetch(ptr) __sync_sub_and_fetch(ptr, 1)
+#define _msgpack_sync_incr_and_fetch(ptr) __sync_add_and_fetch(ptr, 1)
+#endif
+
+#ifdef _WIN32
+
+#ifdef __cplusplus
+/* numeric_limits<T>::min,max */
+#ifdef max
+#undef max
+#endif
+#ifdef min
+#undef min
+#endif
+#endif
+
+#else
+#include <arpa/inet.h>  /* __BYTE_ORDER */
+#endif
+
+#if !defined(__LITTLE_ENDIAN__) && !defined(__BIG_ENDIAN__)
+#if __BYTE_ORDER == __LITTLE_ENDIAN
+#define __LITTLE_ENDIAN__
+#elif __BYTE_ORDER == __BIG_ENDIAN
+#define __BIG_ENDIAN__
+#elif _WIN32
+#define __LITTLE_ENDIAN__
+#endif
+#endif
+
+
+#ifdef __LITTLE_ENDIAN__
+
+#ifdef _WIN32
+#  if defined(ntohs)
+#    define _msgpack_be16(x) ntohs(x)
+#  elif defined(_byteswap_ushort) || (defined(_MSC_VER) && _MSC_VER >= 1400)
+#    define _msgpack_be16(x) ((uint16_t)_byteswap_ushort((unsigned short)x))
+#  else
+#    define _msgpack_be16(x) ( \
+        ((((uint16_t)x) <<  8) ) | \
+        ((((uint16_t)x) >>  8) ) )
+#  endif
+#else
+#  define _msgpack_be16(x) ntohs(x)
+#endif
+
+#ifdef _WIN32
+#  if defined(ntohl)
+#    define _msgpack_be32(x) ntohl(x)
+#  elif defined(_byteswap_ulong) || (defined(_MSC_VER) && _MSC_VER >= 1400)
+#    define _msgpack_be32(x) ((uint32_t)_byteswap_ulong((unsigned long)x))
+#  else
+#    define _msgpack_be32(x) \
+        ( ((((uint32_t)x) << 24)               ) | \
+          ((((uint32_t)x) <<  8) & 0x00ff0000U ) | \
+          ((((uint32_t)x) >>  8) & 0x0000ff00U ) | \
+          ((((uint32_t)x) >> 24)               ) )
+#  endif
+#else
+#  define _msgpack_be32(x) ntohl(x)
+#endif
+
+#if defined(_byteswap_uint64) || (defined(_MSC_VER) && _MSC_VER >= 1400)
+#  define _msgpack_be64(x) (_byteswap_uint64(x))
+#elif defined(bswap_64)
+#  define _msgpack_be64(x) bswap_64(x)
+#elif defined(__DARWIN_OSSwapInt64)
+#  define _msgpack_be64(x) __DARWIN_OSSwapInt64(x)
+#else
+#define _msgpack_be64(x) \
+    ( ((((uint64_t)x) << 56)                         ) | \
+      ((((uint64_t)x) << 40) & 0x00ff000000000000ULL ) | \
+      ((((uint64_t)x) << 24) & 0x0000ff0000000000ULL ) | \
+      ((((uint64_t)x) <<  8) & 0x000000ff00000000ULL ) | \
+      ((((uint64_t)x) >>  8) & 0x00000000ff000000ULL ) | \
+      ((((uint64_t)x) >> 24) & 0x0000000000ff0000ULL ) | \
+      ((((uint64_t)x) >> 40) & 0x000000000000ff00ULL ) | \
+      ((((uint64_t)x) >> 56)                         ) )
+#endif
+
+#define _msgpack_load16(cast, from) ((cast)( \
+        (((uint16_t)((uint8_t*)(from))[0]) << 8) | \
+        (((uint16_t)((uint8_t*)(from))[1])     ) ))
+
+#define _msgpack_load32(cast, from) ((cast)( \
+        (((uint32_t)((uint8_t*)(from))[0]) << 24) | \
+        (((uint32_t)((uint8_t*)(from))[1]) << 16) | \
+        (((uint32_t)((uint8_t*)(from))[2]) <<  8) | \
+        (((uint32_t)((uint8_t*)(from))[3])      ) ))
+
+#define _msgpack_load64(cast, from) ((cast)( \
+        (((uint64_t)((uint8_t*)(from))[0]) << 56) | \
+        (((uint64_t)((uint8_t*)(from))[1]) << 48) | \
+        (((uint64_t)((uint8_t*)(from))[2]) << 40) | \
+        (((uint64_t)((uint8_t*)(from))[3]) << 32) | \
+        (((uint64_t)((uint8_t*)(from))[4]) << 24) | \
+        (((uint64_t)((uint8_t*)(from))[5]) << 16) | \
+        (((uint64_t)((uint8_t*)(from))[6]) << 8)  | \
+        (((uint64_t)((uint8_t*)(from))[7])     )  ))
+
+#else
+
+#define _msgpack_be16(x) (x)
+#define _msgpack_be32(x) (x)
+#define _msgpack_be64(x) (x)
+
+#define _msgpack_load16(cast, from) ((cast)( \
+        (((uint16_t)((uint8_t*)from)[0]) << 8) | \
+        (((uint16_t)((uint8_t*)from)[1])     ) ))
+
+#define _msgpack_load32(cast, from) ((cast)( \
+        (((uint32_t)((uint8_t*)from)[0]) << 24) | \
+        (((uint32_t)((uint8_t*)from)[1]) << 16) | \
+        (((uint32_t)((uint8_t*)from)[2]) <<  8) | \
+        (((uint32_t)((uint8_t*)from)[3])      ) ))
+
+#define _msgpack_load64(cast, from) ((cast)( \
+        (((uint64_t)((uint8_t*)from)[0]) << 56) | \
+        (((uint64_t)((uint8_t*)from)[1]) << 48) | \
+        (((uint64_t)((uint8_t*)from)[2]) << 40) | \
+        (((uint64_t)((uint8_t*)from)[3]) << 32) | \
+        (((uint64_t)((uint8_t*)from)[4]) << 24) | \
+        (((uint64_t)((uint8_t*)from)[5]) << 16) | \
+        (((uint64_t)((uint8_t*)from)[6]) << 8)  | \
+        (((uint64_t)((uint8_t*)from)[7])     )  ))
+#endif
+
+
+#define _msgpack_store16(to, num) \
+    do { uint16_t val = _msgpack_be16(num); memcpy(to, &val, 2); } while(0)
+#define _msgpack_store32(to, num) \
+    do { uint32_t val = _msgpack_be32(num); memcpy(to, &val, 4); } while(0)
+#define _msgpack_store64(to, num) \
+    do { uint64_t val = _msgpack_be64(num); memcpy(to, &val, 8); } while(0)
+
+/*
+#define _msgpack_load16(cast, from) \
+    ({ cast val; memcpy(&val, (char*)from, 2); _msgpack_be16(val); })
+#define _msgpack_load32(cast, from) \
+    ({ cast val; memcpy(&val, (char*)from, 4); _msgpack_be32(val); })
+#define _msgpack_load64(cast, from) \
+    ({ cast val; memcpy(&val, (char*)from, 8); _msgpack_be64(val); })
+*/
+
+
+#endif /* msgpack/sysdep.h */

+ 390 - 0
src/borg/cache_sync/unpack.h

@@ -0,0 +1,390 @@
+/*
+ * Borg cache synchronizer,
+ * based on a MessagePack for Python unpacking routine
+ *
+ * Copyright (C) 2009 Naoki INADA
+ * Copyright (c) 2017 Marian Beermann
+ *
+ *    Licensed under the Apache License, Version 2.0 (the "License");
+ *    you may not use this file except in compliance with the License.
+ *    You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *    Unless required by applicable law or agreed to in writing, software
+ *    distributed under the License is distributed on an "AS IS" BASIS,
+ *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *    See the License for the specific language governing permissions and
+ *    limitations under the License.
+ */
+
+/*
+ * This limits the depth of the structures we can unpack, i.e. how many containers
+ * are nestable.
+ */
+#define MSGPACK_EMBED_STACK_SIZE  (16)
+#include "unpack_define.h"
+
+// 2**32 - 1025
+#define _MAX_VALUE ( (uint32_t) 4294966271 )
+
+#define MIN(x, y) ((x) < (y) ? (x): (y))
+
+#ifdef DEBUG
+#define SET_LAST_ERROR(msg) \
+  fprintf(stderr, "cache_sync parse error: %s\n", (msg)); \
+  u->last_error = (msg);
+#else
+#define SET_LAST_ERROR(msg) \
+  u->last_error = (msg);
+#endif
+
+typedef struct unpack_user {
+    /* Item.chunks is at the top level; we don't care about anything else,
+     * only need to track the current level to navigate arbitrary and unknown structure.
+     * To discern keys from everything else on the top level we use expect_map_item_end.
+     */
+    int level;
+
+    const char *last_error;
+
+    HashIndex *chunks;
+
+    /*
+     * We don't care about most stuff. This flag tells us whether we're at the chunks structure,
+     * meaning:
+     * {'foo': 'bar', 'chunks': [...], 'stuff': ... }
+     *                        ^-HERE-^
+     */
+    int inside_chunks;
+    enum {
+        /* the next thing is a map key at the Item root level,
+         * and it might be the "chunks" key we're looking for */
+        expect_chunks_map_key,
+
+        /* blocking state to expect_chunks_map_key
+         * {     'stuff': <complex and arbitrary structure>,     'chunks': [
+         * ecmk    ->   emie    ->   ->       ->      ->   ecmk  ecb       eeboce
+         *                (nested containers are tracked via level)
+         * ecmk=expect_chunks_map_key, emie=expect_map_item_end, ecb=expect_chunks_begin,
+         * eeboce=expect_entry_begin_or_chunks_end
+         */
+        expect_map_item_end,
+
+        /* next thing must be the chunks array (array) */
+        expect_chunks_begin,
+
+        /* next thing must either be another CLE (array) or end of Item.chunks (array_end) */
+        expect_entry_begin_or_chunks_end,
+
+        /*
+         * processing ChunkListEntry tuple:
+         * expect_key, expect_size, expect_csize, expect_entry_end
+         */
+        /* next thing must be the key (raw, l=32) */
+        expect_key,
+        /* next thing must be the size (int) */
+        expect_size,
+        /* next thing must be the csize (int) */
+        expect_csize,
+        /* next thing must be the end of the CLE (array_end) */
+        expect_entry_end,
+
+        expect_item_begin
+    } expect;
+
+    struct {
+        char key[32];
+        uint32_t csize;
+        uint32_t size;
+    } current;
+} unpack_user;
+
+struct unpack_context;
+typedef struct unpack_context unpack_context;
+typedef int (*execute_fn)(unpack_context *ctx, const char* data, size_t len, size_t* off);
+
+#define UNEXPECTED(what)                                            \
+    if(u->inside_chunks || u->expect == expect_chunks_map_key) { \
+        SET_LAST_ERROR("Unexpected object: " what);                 \
+        return -1;                                                  \
+    }
+
+static inline void unpack_init_user_state(unpack_user *u)
+{
+    u->last_error = NULL;
+    u->level = 0;
+    u->inside_chunks = false;
+    u->expect = expect_item_begin;
+}
+
+static inline int unpack_callback_int64(unpack_user* u, int64_t d)
+{
+    switch(u->expect) {
+        case expect_size:
+            u->current.size = d;
+            u->expect = expect_csize;
+            break;
+        case expect_csize:
+            u->current.csize = d;
+            u->expect = expect_entry_end;
+            break;
+        default:
+            UNEXPECTED("integer");
+    }
+    return 0;
+}
+
+static inline int unpack_callback_uint16(unpack_user* u, uint16_t d)
+{
+    return unpack_callback_int64(u, d);
+}
+
+static inline int unpack_callback_uint8(unpack_user* u, uint8_t d)
+{
+    return unpack_callback_int64(u, d);
+}
+
+
+static inline int unpack_callback_uint32(unpack_user* u, uint32_t d)
+{
+    return unpack_callback_int64(u, d);
+}
+
+static inline int unpack_callback_uint64(unpack_user* u, uint64_t d)
+{
+    return unpack_callback_int64(u, d);
+}
+
+static inline int unpack_callback_int32(unpack_user* u, int32_t d)
+{
+    return unpack_callback_int64(u, d);
+}
+
+static inline int unpack_callback_int16(unpack_user* u, int16_t d)
+{
+    return unpack_callback_int64(u, d);
+}
+
+static inline int unpack_callback_int8(unpack_user* u, int8_t d)
+{
+    return unpack_callback_int64(u, d);
+}
+
+/* Ain't got anything to do with those floats */
+static inline int unpack_callback_double(unpack_user* u, double d)
+{
+    (void)d;
+    UNEXPECTED("double");
+    return 0;
+}
+
+static inline int unpack_callback_float(unpack_user* u, float d)
+{
+    (void)d;
+    UNEXPECTED("float");
+    return 0;
+}
+
+/* nil/true/false — I/don't/care */
+static inline int unpack_callback_nil(unpack_user* u)
+{
+    UNEXPECTED("nil");
+    return 0;
+}
+
+static inline int unpack_callback_true(unpack_user* u)
+{
+    UNEXPECTED("true");
+    return 0;
+}
+
+static inline int unpack_callback_false(unpack_user* u)
+{
+    UNEXPECTED("false");
+    return 0;
+}
+
+static inline int unpack_callback_array(unpack_user* u, unsigned int n)
+{
+    switch(u->expect) {
+    case expect_chunks_begin:
+        /* b'chunks': [
+         *            ^ */
+        u->expect = expect_entry_begin_or_chunks_end;
+        break;
+    case expect_entry_begin_or_chunks_end:
+        /* b'chunks': [ (
+         *              ^ */
+        if(n != 3) {
+            SET_LAST_ERROR("Invalid chunk list entry length");
+            return -1;
+        }
+        u->expect = expect_key;
+        break;
+    default:
+        if(u->inside_chunks) {
+            SET_LAST_ERROR("Unexpected array start");
+            return -1;
+        } else {
+            u->level++;
+            return 0;
+        }
+    }
+    return 0;
+}
+
+static inline int unpack_callback_array_item(unpack_user* u, unsigned int current)
+{
+    (void)u; (void)current;
+    return 0;
+}
+
+static inline int unpack_callback_array_end(unpack_user* u)
+{
+    uint32_t *cache_entry;
+    uint32_t cache_values[3];
+    uint64_t refcount;
+
+    switch(u->expect) {
+    case expect_entry_end:
+        /* b'chunks': [ ( b'1234...', 123, 345 )
+         *                                     ^ */
+        cache_entry = (uint32_t*) hashindex_get(u->chunks, u->current.key);
+        if(cache_entry) {
+            refcount = _le32toh(cache_entry[0]);
+            if(refcount > _MAX_VALUE) {
+                SET_LAST_ERROR("invalid reference count");
+                return -1;
+            }
+            refcount += 1;
+            cache_entry[0] = _htole32(MIN(refcount, _MAX_VALUE));
+        } else {
+            /* refcount, size, csize */
+            cache_values[0] = _htole32(1);
+            cache_values[1] = _htole32(u->current.size);
+            cache_values[2] = _htole32(u->current.csize);
+            if(!hashindex_set(u->chunks, u->current.key, cache_values)) {
+                SET_LAST_ERROR("hashindex_set failed");
+                return -1;
+            }
+        }
+
+        u->expect = expect_entry_begin_or_chunks_end;
+        break;
+    case expect_entry_begin_or_chunks_end:
+        /* b'chunks': [ ]
+         *              ^ */
+        /* end of Item.chunks */
+        u->inside_chunks = 0;
+        u->expect = expect_map_item_end;
+        break;
+    default:
+        if(u->inside_chunks) {
+            SET_LAST_ERROR("Invalid state transition (unexpected array end)");
+            return -1;
+        } else {
+            u->level--;
+            return 0;
+        }
+    }
+    return 0;
+}
+
+static inline int unpack_callback_map(unpack_user* u, unsigned int n)
+{
+    (void)n;
+
+    if(u->level == 0) {
+        if(u->expect != expect_item_begin) {
+            SET_LAST_ERROR("Invalid state transition");  /* unreachable */
+            return -1;
+        }
+        /* This begins a new Item */
+        u->expect = expect_chunks_map_key;
+    }
+
+    if(u->inside_chunks) {
+        UNEXPECTED("map");
+    }
+
+    u->level++;
+
+    return 0;
+}
+
+static inline int unpack_callback_map_item(unpack_user* u, unsigned int current)
+{
+    (void)u; (void)current;
+
+    if(u->level == 1) {
+        switch(u->expect) {
+        case expect_map_item_end:
+            u->expect = expect_chunks_map_key;
+            break;
+        default:
+            SET_LAST_ERROR("Unexpected map item");
+            return -1;
+        }
+    }
+    return 0;
+}
+
+static inline int unpack_callback_map_end(unpack_user* u)
+{
+    u->level--;
+    if(u->inside_chunks) {
+        SET_LAST_ERROR("Unexpected map end");
+        return -1;
+    }
+    return 0;
+}
+
+static inline int unpack_callback_raw(unpack_user* u, const char* b, const char* p, unsigned int length)
+{
+    /* raw = what Borg uses for binary stuff and strings as well */
+    /* Note: p points to an internal buffer which contains l bytes. */
+    (void)b;
+
+    switch(u->expect) {
+    case expect_key:
+        if(length != 32) {
+            SET_LAST_ERROR("Incorrect key length");
+            return -1;
+        }
+        memcpy(u->current.key, p, 32);
+        u->expect = expect_size;
+        break;
+    case expect_chunks_map_key:
+        if(length == 6 && !memcmp("chunks", p, 6)) {
+            u->expect = expect_chunks_begin;
+            u->inside_chunks = 1;
+        } else {
+            u->expect = expect_map_item_end;
+        }
+        break;
+    default:
+        if(u->inside_chunks) {
+            SET_LAST_ERROR("Unexpected bytes in chunks structure");
+            return -1;
+        }
+    }
+    return 0;
+}
+
+static inline int unpack_callback_bin(unpack_user* u, const char* b, const char* p, unsigned int length)
+{
+    (void)u; (void)b; (void)p; (void)length;
+    UNEXPECTED("bin");
+    return 0;
+}
+
+static inline int unpack_callback_ext(unpack_user* u, const char* base, const char* pos,
+                                      unsigned int length)
+{
+    (void)u; (void)base; (void)pos; (void)length;
+    UNEXPECTED("ext");
+    return 0;
+}
+
+#include "unpack_template.h"

+ 95 - 0
src/borg/cache_sync/unpack_define.h

@@ -0,0 +1,95 @@
+/*
+ * MessagePack unpacking routine template
+ *
+ * Copyright (C) 2008-2010 FURUHASHI Sadayuki
+ *
+ *    Licensed under the Apache License, Version 2.0 (the "License");
+ *    you may not use this file except in compliance with the License.
+ *    You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *    Unless required by applicable law or agreed to in writing, software
+ *    distributed under the License is distributed on an "AS IS" BASIS,
+ *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *    See the License for the specific language governing permissions and
+ *    limitations under the License.
+ */
+#ifndef MSGPACK_UNPACK_DEFINE_H__
+#define MSGPACK_UNPACK_DEFINE_H__
+
+#include "sysdep.h"
+#include <stdlib.h>
+#include <string.h>
+#include <assert.h>
+#include <stdio.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+
+#ifndef MSGPACK_EMBED_STACK_SIZE
+#define MSGPACK_EMBED_STACK_SIZE 32
+#endif
+
+
+// CS is first byte & 0x1f
+typedef enum {
+    CS_HEADER            = 0x00,  // nil
+
+    //CS_                = 0x01,
+    //CS_                = 0x02,  // false
+    //CS_                = 0x03,  // true
+
+    CS_BIN_8             = 0x04,
+    CS_BIN_16            = 0x05,
+    CS_BIN_32            = 0x06,
+
+    CS_EXT_8             = 0x07,
+    CS_EXT_16            = 0x08,
+    CS_EXT_32            = 0x09,
+
+    CS_FLOAT             = 0x0a,
+    CS_DOUBLE            = 0x0b,
+    CS_UINT_8            = 0x0c,
+    CS_UINT_16           = 0x0d,
+    CS_UINT_32           = 0x0e,
+    CS_UINT_64           = 0x0f,
+    CS_INT_8             = 0x10,
+    CS_INT_16            = 0x11,
+    CS_INT_32            = 0x12,
+    CS_INT_64            = 0x13,
+
+    //CS_FIXEXT1           = 0x14,
+    //CS_FIXEXT2           = 0x15,
+    //CS_FIXEXT4           = 0x16,
+    //CS_FIXEXT8           = 0x17,
+    //CS_FIXEXT16          = 0x18,
+
+    CS_RAW_8             = 0x19,
+    CS_RAW_16            = 0x1a,
+    CS_RAW_32            = 0x1b,
+    CS_ARRAY_16          = 0x1c,
+    CS_ARRAY_32          = 0x1d,
+    CS_MAP_16            = 0x1e,
+    CS_MAP_32            = 0x1f,
+
+    ACS_RAW_VALUE,
+    ACS_BIN_VALUE,
+    ACS_EXT_VALUE,
+} msgpack_unpack_state;
+
+
+typedef enum {
+    CT_ARRAY_ITEM,
+    CT_MAP_KEY,
+    CT_MAP_VALUE,
+} msgpack_container_type;
+
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* msgpack/unpack_define.h */

+ 365 - 0
src/borg/cache_sync/unpack_template.h

@@ -0,0 +1,365 @@
+/*
+ * MessagePack unpacking routine template
+ *
+ * Copyright (C) 2008-2010 FURUHASHI Sadayuki
+ * Copyright (c) 2017 Marian Beermann
+ *
+ *    Licensed under the Apache License, Version 2.0 (the "License");
+ *    you may not use this file except in compliance with the License.
+ *    You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *    Unless required by applicable law or agreed to in writing, software
+ *    distributed under the License is distributed on an "AS IS" BASIS,
+ *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *    See the License for the specific language governing permissions and
+ *    limitations under the License.
+ *
+ *
+ * This has been slightly adapted from the vanilla msgpack-{c, python} version.
+ * Since cache_sync does not intend to build an output data structure,
+ * msgpack_unpack_object and all of its uses was removed.
+ */
+
+#ifndef USE_CASE_RANGE
+#if !defined(_MSC_VER)
+#define USE_CASE_RANGE
+#endif
+#endif
+
+typedef struct unpack_stack {
+    size_t size;
+    size_t count;
+    unsigned int ct;
+} unpack_stack;
+
+struct unpack_context {
+    unpack_user user;
+    unsigned int cs;
+    unsigned int trail;
+    unsigned int top;
+    unpack_stack stack[MSGPACK_EMBED_STACK_SIZE];
+};
+
+static inline void unpack_init(unpack_context* ctx)
+{
+    ctx->cs = CS_HEADER;
+    ctx->trail = 0;
+    ctx->top = 0;
+    unpack_init_user_state(&ctx->user);
+}
+
+#define construct 1
+
+static inline int unpack_execute(unpack_context* ctx, const char* data, size_t len, size_t* off)
+{
+    assert(len >= *off);
+
+    const unsigned char* p = (unsigned char*)data + *off;
+    const unsigned char* const pe = (unsigned char*)data + len;
+    const void* n = NULL;
+
+    unsigned int trail = ctx->trail;
+    unsigned int cs = ctx->cs;
+    unsigned int top = ctx->top;
+    unpack_stack* stack = ctx->stack;
+    unpack_user* user = &ctx->user;
+
+    unpack_stack* c = NULL;
+
+    int ret;
+
+#define construct_cb(name) \
+    construct && unpack_callback ## name
+
+#define push_simple_value(func) \
+    if(construct_cb(func)(user) < 0) { goto _failed; } \
+    goto _push
+#define push_fixed_value(func, arg) \
+    if(construct_cb(func)(user, arg) < 0) { goto _failed; } \
+    goto _push
+#define push_variable_value(func, base, pos, len) \
+    if(construct_cb(func)(user, \
+        (const char*)base, (const char*)pos, len) < 0) { goto _failed; } \
+    goto _push
+
+#define again_fixed_trail(_cs, trail_len) \
+    trail = trail_len; \
+    cs = _cs; \
+    goto _fixed_trail_again
+#define again_fixed_trail_if_zero(_cs, trail_len, ifzero) \
+    trail = trail_len; \
+    if(trail == 0) { goto ifzero; } \
+    cs = _cs; \
+    goto _fixed_trail_again
+
+#define start_container(func, count_, ct_) \
+    if(top >= MSGPACK_EMBED_STACK_SIZE) { goto _failed; } /* FIXME */ \
+    if(construct_cb(func)(user, count_) < 0) { goto _failed; } \
+    if((count_) == 0) { \
+        if (construct_cb(func##_end)(user) < 0) { goto _failed; } \
+        goto _push; } \
+    stack[top].ct = ct_; \
+    stack[top].size  = count_; \
+    stack[top].count = 0; \
+    ++top; \
+    goto _header_again
+
+#define NEXT_CS(p)  ((unsigned int)*p & 0x1f)
+
+#ifdef USE_CASE_RANGE
+#define SWITCH_RANGE_BEGIN     switch(*p) {
+#define SWITCH_RANGE(FROM, TO) case FROM ... TO:
+#define SWITCH_RANGE_DEFAULT   default:
+#define SWITCH_RANGE_END       }
+#else
+#define SWITCH_RANGE_BEGIN     { if(0) {
+#define SWITCH_RANGE(FROM, TO) } else if(FROM <= *p && *p <= TO) {
+#define SWITCH_RANGE_DEFAULT   } else {
+#define SWITCH_RANGE_END       } }
+#endif
+
+    if(p == pe) { goto _out; }
+    do {
+        switch(cs) {
+        case CS_HEADER:
+            SWITCH_RANGE_BEGIN
+            SWITCH_RANGE(0x00, 0x7f)  // Positive Fixnum
+                push_fixed_value(_uint8, *(uint8_t*)p);
+            SWITCH_RANGE(0xe0, 0xff)  // Negative Fixnum
+                push_fixed_value(_int8, *(int8_t*)p);
+            SWITCH_RANGE(0xc0, 0xdf)  // Variable
+                switch(*p) {
+                case 0xc0:  // nil
+                    push_simple_value(_nil);
+                //case 0xc1:  // never used
+                case 0xc2:  // false
+                    push_simple_value(_false);
+                case 0xc3:  // true
+                    push_simple_value(_true);
+                case 0xc4:  // bin 8
+                    again_fixed_trail(NEXT_CS(p), 1);
+                case 0xc5:  // bin 16
+                    again_fixed_trail(NEXT_CS(p), 2);
+                case 0xc6:  // bin 32
+                    again_fixed_trail(NEXT_CS(p), 4);
+                case 0xc7:  // ext 8
+                    again_fixed_trail(NEXT_CS(p), 1);
+                case 0xc8:  // ext 16
+                    again_fixed_trail(NEXT_CS(p), 2);
+                case 0xc9:  // ext 32
+                    again_fixed_trail(NEXT_CS(p), 4);
+                case 0xca:  // float
+                case 0xcb:  // double
+                case 0xcc:  // unsigned int  8
+                case 0xcd:  // unsigned int 16
+                case 0xce:  // unsigned int 32
+                case 0xcf:  // unsigned int 64
+                case 0xd0:  // signed int  8
+                case 0xd1:  // signed int 16
+                case 0xd2:  // signed int 32
+                case 0xd3:  // signed int 64
+                    again_fixed_trail(NEXT_CS(p), 1 << (((unsigned int)*p) & 0x03));
+                case 0xd4:  // fixext 1
+                case 0xd5:  // fixext 2
+                case 0xd6:  // fixext 4
+                case 0xd7:  // fixext 8
+                    again_fixed_trail_if_zero(ACS_EXT_VALUE, 
+                                              (1 << (((unsigned int)*p) & 0x03))+1,
+                                              _ext_zero);
+                case 0xd8:  // fixext 16
+                    again_fixed_trail_if_zero(ACS_EXT_VALUE, 16+1, _ext_zero);
+                case 0xd9:  // str 8
+                    again_fixed_trail(NEXT_CS(p), 1);
+                case 0xda:  // raw 16
+                case 0xdb:  // raw 32
+                case 0xdc:  // array 16
+                case 0xdd:  // array 32
+                case 0xde:  // map 16
+                case 0xdf:  // map 32
+                    again_fixed_trail(NEXT_CS(p), 2 << (((unsigned int)*p) & 0x01));
+                default:
+                    goto _failed;
+                }
+            SWITCH_RANGE(0xa0, 0xbf)  // FixRaw
+                again_fixed_trail_if_zero(ACS_RAW_VALUE, ((unsigned int)*p & 0x1f), _raw_zero);
+            SWITCH_RANGE(0x90, 0x9f)  // FixArray
+                start_container(_array, ((unsigned int)*p) & 0x0f, CT_ARRAY_ITEM);
+            SWITCH_RANGE(0x80, 0x8f)  // FixMap
+                start_container(_map, ((unsigned int)*p) & 0x0f, CT_MAP_KEY);
+
+            SWITCH_RANGE_DEFAULT
+                goto _failed;
+            SWITCH_RANGE_END
+            // end CS_HEADER
+
+
+        _fixed_trail_again:
+            ++p;
+
+        default:
+            if((size_t)(pe - p) < trail) { goto _out; }
+            n = p;  p += trail - 1;
+            switch(cs) {
+            case CS_EXT_8:
+                again_fixed_trail_if_zero(ACS_EXT_VALUE, *(uint8_t*)n+1, _ext_zero);
+            case CS_EXT_16:
+                again_fixed_trail_if_zero(ACS_EXT_VALUE,
+                                          _msgpack_load16(uint16_t,n)+1,
+                                          _ext_zero);
+            case CS_EXT_32:
+                again_fixed_trail_if_zero(ACS_EXT_VALUE,
+                                          _msgpack_load32(uint32_t,n)+1,
+                                          _ext_zero);
+            case CS_FLOAT: {
+                    union { uint32_t i; float f; } mem;
+                    mem.i = _msgpack_load32(uint32_t,n);
+                    push_fixed_value(_float, mem.f); }
+            case CS_DOUBLE: {
+                    union { uint64_t i; double f; } mem;
+                    mem.i = _msgpack_load64(uint64_t,n);
+#if defined(__arm__) && !(__ARM_EABI__) // arm-oabi
+                    // https://github.com/msgpack/msgpack-perl/pull/1
+                    mem.i = (mem.i & 0xFFFFFFFFUL) << 32UL | (mem.i >> 32UL);
+#endif
+                    push_fixed_value(_double, mem.f); }
+            case CS_UINT_8:
+                push_fixed_value(_uint8, *(uint8_t*)n);
+            case CS_UINT_16:
+                push_fixed_value(_uint16, _msgpack_load16(uint16_t,n));
+            case CS_UINT_32:
+                push_fixed_value(_uint32, _msgpack_load32(uint32_t,n));
+            case CS_UINT_64:
+                push_fixed_value(_uint64, _msgpack_load64(uint64_t,n));
+
+            case CS_INT_8:
+                push_fixed_value(_int8, *(int8_t*)n);
+            case CS_INT_16:
+                push_fixed_value(_int16, _msgpack_load16(int16_t,n));
+            case CS_INT_32:
+                push_fixed_value(_int32, _msgpack_load32(int32_t,n));
+            case CS_INT_64:
+                push_fixed_value(_int64, _msgpack_load64(int64_t,n));
+
+            case CS_BIN_8:
+                again_fixed_trail_if_zero(ACS_BIN_VALUE, *(uint8_t*)n, _bin_zero);
+            case CS_BIN_16:
+                again_fixed_trail_if_zero(ACS_BIN_VALUE, _msgpack_load16(uint16_t,n), _bin_zero);
+            case CS_BIN_32:
+                again_fixed_trail_if_zero(ACS_BIN_VALUE, _msgpack_load32(uint32_t,n), _bin_zero);
+            case ACS_BIN_VALUE:
+            _bin_zero:
+                push_variable_value(_bin, data, n, trail);
+
+            case CS_RAW_8:
+                again_fixed_trail_if_zero(ACS_RAW_VALUE, *(uint8_t*)n, _raw_zero);
+            case CS_RAW_16:
+                again_fixed_trail_if_zero(ACS_RAW_VALUE, _msgpack_load16(uint16_t,n), _raw_zero);
+            case CS_RAW_32:
+                again_fixed_trail_if_zero(ACS_RAW_VALUE, _msgpack_load32(uint32_t,n), _raw_zero);
+            case ACS_RAW_VALUE:
+            _raw_zero:
+                push_variable_value(_raw, data, n, trail);
+
+            case ACS_EXT_VALUE:
+            _ext_zero:
+                push_variable_value(_ext, data, n, trail);
+
+            case CS_ARRAY_16:
+                start_container(_array, _msgpack_load16(uint16_t,n), CT_ARRAY_ITEM);
+            case CS_ARRAY_32:
+                /* FIXME security guard */
+                start_container(_array, _msgpack_load32(uint32_t,n), CT_ARRAY_ITEM);
+
+            case CS_MAP_16:
+                start_container(_map, _msgpack_load16(uint16_t,n), CT_MAP_KEY);
+            case CS_MAP_32:
+                /* FIXME security guard */
+                start_container(_map, _msgpack_load32(uint32_t,n), CT_MAP_KEY);
+
+            default:
+                goto _failed;
+            }
+        }
+
+_push:
+    if(top == 0) { goto _finish; }
+    c = &stack[top-1];
+    switch(c->ct) {
+    case CT_ARRAY_ITEM:
+        if(construct_cb(_array_item)(user, c->count) < 0) { goto _failed; }
+        if(++c->count == c->size) {
+            if (construct_cb(_array_end)(user) < 0) { goto _failed; }
+            --top;
+            /*printf("stack pop %d\n", top);*/
+            goto _push;
+        }
+        goto _header_again;
+    case CT_MAP_KEY:
+        c->ct = CT_MAP_VALUE;
+        goto _header_again;
+    case CT_MAP_VALUE:
+        if(construct_cb(_map_item)(user, c->count) < 0) { goto _failed; }
+        if(++c->count == c->size) {
+            if (construct_cb(_map_end)(user) < 0) { goto _failed; }
+            --top;
+            /*printf("stack pop %d\n", top);*/
+            goto _push;
+        }
+        c->ct = CT_MAP_KEY;
+        goto _header_again;
+
+    default:
+        goto _failed;
+    }
+
+_header_again:
+        cs = CS_HEADER;
+        ++p;
+    } while(p != pe);
+    goto _out;
+
+
+_finish:
+    if (!construct)
+        unpack_callback_nil(user);
+    ++p;
+    ret = 1;
+    /* printf("-- finish --\n"); */
+    goto _end;
+
+_failed:
+    /* printf("** FAILED **\n"); */
+    ret = -1;
+    goto _end;
+
+_out:
+    ret = 0;
+    goto _end;
+
+_end:
+    ctx->cs = cs;
+    ctx->trail = trail;
+    ctx->top = top;
+    *off = p - (const unsigned char*)data;
+
+    return ret;
+#undef construct_cb
+}
+
+#undef SWITCH_RANGE_BEGIN
+#undef SWITCH_RANGE
+#undef SWITCH_RANGE_DEFAULT
+#undef SWITCH_RANGE_END
+#undef push_simple_value
+#undef push_fixed_value
+#undef push_variable_value
+#undef again_fixed_trail
+#undef again_fixed_trail_if_zero
+#undef start_container
+#undef construct
+
+#undef NEXT_CS
+
+/* vim: set ts=4 sw=4 sts=4 expandtab  */

+ 1 - 1
src/borg/fuse.py

@@ -340,7 +340,7 @@ class FuseOperations(llfuse.Operations):
                     # evict fully read chunk from cache
                     # evict fully read chunk from cache
                     del self.data_cache[id]
                     del self.data_cache[id]
             else:
             else:
-                data = self.key.decrypt(id, self.repository.get(id))
+                data = self.key.decrypt(id, self.repository_uncached.get(id))
                 if offset + n < len(data):
                 if offset + n < len(data):
                     # chunk was only partially read, cache it
                     # chunk was only partially read, cache it
                     self.data_cache[id] = data
                     self.data_cache[id] = data

+ 45 - 4
src/borg/hashindex.pyx

@@ -7,8 +7,9 @@ cimport cython
 from libc.stdint cimport uint32_t, UINT32_MAX, uint64_t
 from libc.stdint cimport uint32_t, UINT32_MAX, uint64_t
 from libc.errno cimport errno
 from libc.errno cimport errno
 from cpython.exc cimport PyErr_SetFromErrnoWithFilename
 from cpython.exc cimport PyErr_SetFromErrnoWithFilename
+from cpython.buffer cimport PyBUF_SIMPLE, PyObject_GetBuffer, PyBuffer_Release
 
 
-API_VERSION = '1.1_01'
+API_VERSION = '1.1_02'
 
 
 
 
 cdef extern from "_hashindex.c":
 cdef extern from "_hashindex.c":
@@ -31,6 +32,18 @@ cdef extern from "_hashindex.c":
     double HASH_MAX_LOAD
     double HASH_MAX_LOAD
 
 
 
 
+cdef extern from "cache_sync/cache_sync.c":
+    ctypedef struct CacheSyncCtx:
+        pass
+
+    CacheSyncCtx *cache_sync_init(HashIndex *chunks)
+    const char *cache_sync_error(CacheSyncCtx *ctx)
+    int cache_sync_feed(CacheSyncCtx *ctx, void *data, uint32_t length)
+    void cache_sync_free(CacheSyncCtx *ctx)
+
+    uint32_t _MAX_VALUE
+
+
 cdef _NoDefault = object()
 cdef _NoDefault = object()
 
 
 """
 """
@@ -50,9 +63,6 @@ AssertionError is raised instead.
 
 
 assert UINT32_MAX == 2**32-1
 assert UINT32_MAX == 2**32-1
 
 
-# module-level constant because cdef's in classes can't have default values
-cdef uint32_t _MAX_VALUE = 2**32-1025
-
 assert _MAX_VALUE % 2 == 1
 assert _MAX_VALUE % 2 == 1
 
 
 
 
@@ -375,3 +385,34 @@ cdef class ChunkKeyIterator:
         cdef uint32_t refcount = _le32toh(value[0])
         cdef uint32_t refcount = _le32toh(value[0])
         assert refcount <= _MAX_VALUE, "invalid reference count"
         assert refcount <= _MAX_VALUE, "invalid reference count"
         return (<char *>self.key)[:self.key_size], ChunkIndexEntry(refcount, _le32toh(value[1]), _le32toh(value[2]))
         return (<char *>self.key)[:self.key_size], ChunkIndexEntry(refcount, _le32toh(value[1]), _le32toh(value[2]))
+
+
+cdef Py_buffer ro_buffer(object data) except *:
+    cdef Py_buffer view
+    PyObject_GetBuffer(data, &view, PyBUF_SIMPLE)
+    return view
+
+
+cdef class CacheSynchronizer:
+    cdef ChunkIndex chunks
+    cdef CacheSyncCtx *sync
+
+    def __cinit__(self, chunks):
+        self.chunks = chunks
+        self.sync = cache_sync_init(self.chunks.index)
+        if not self.sync:
+            raise Exception('cache_sync_init failed')
+
+    def __dealloc__(self):
+        if self.sync:
+            cache_sync_free(self.sync)
+
+    def feed(self, chunk):
+        cdef Py_buffer chunk_buf = ro_buffer(chunk)
+        cdef int rc
+        rc = cache_sync_feed(self.sync, chunk_buf.buf, chunk_buf.len)
+        PyBuffer_Release(&chunk_buf)
+        if not rc:
+            error = cache_sync_error(self.sync)
+            if error != NULL:
+                raise ValueError('cache_sync_feed failed: ' + error.decode('ascii'))

+ 1 - 1
src/borg/helpers.py

@@ -125,7 +125,7 @@ def check_python():
 
 
 def check_extension_modules():
 def check_extension_modules():
     from . import platform, compress, item
     from . import platform, compress, item
-    if hashindex.API_VERSION != '1.1_01':
+    if hashindex.API_VERSION != '1.1_02':
         raise ExtensionModuleError
         raise ExtensionModuleError
     if chunker.API_VERSION != '1.1_01':
     if chunker.API_VERSION != '1.1_01':
         raise ExtensionModuleError
         raise ExtensionModuleError

+ 144 - 30
src/borg/remote.py

@@ -7,6 +7,8 @@ import logging
 import os
 import os
 import select
 import select
 import shlex
 import shlex
+import shutil
+import struct
 import sys
 import sys
 import tempfile
 import tempfile
 import textwrap
 import textwrap
@@ -17,15 +19,18 @@ from subprocess import Popen, PIPE
 import msgpack
 import msgpack
 
 
 from . import __version__
 from . import __version__
+from .compress import LZ4
 from .helpers import Error, IntegrityError
 from .helpers import Error, IntegrityError
 from .helpers import bin_to_hex
 from .helpers import bin_to_hex
 from .helpers import get_home_dir
 from .helpers import get_home_dir
 from .helpers import hostname_is_unique
 from .helpers import hostname_is_unique
 from .helpers import replace_placeholders
 from .helpers import replace_placeholders
 from .helpers import sysinfo
 from .helpers import sysinfo
+from .helpers import format_file_size
 from .logger import create_logger, setup_logging
 from .logger import create_logger, setup_logging
 from .repository import Repository, MAX_OBJECT_SIZE, LIST_SCAN_LIMIT
 from .repository import Repository, MAX_OBJECT_SIZE, LIST_SCAN_LIMIT
 from .version import parse_version, format_version
 from .version import parse_version, format_version
+from .algorithms.checksums import xxh64
 
 
 logger = create_logger(__name__)
 logger = create_logger(__name__)
 
 
@@ -1057,9 +1062,14 @@ class RepositoryNoCache:
     """A not caching Repository wrapper, passes through to repository.
     """A not caching Repository wrapper, passes through to repository.
 
 
     Just to have same API (including the context manager) as RepositoryCache.
     Just to have same API (including the context manager) as RepositoryCache.
+
+    *transform* is a callable taking two arguments, key and raw repository data.
+    The return value is returned from get()/get_many(). By default, the raw
+    repository data is returned.
     """
     """
-    def __init__(self, repository):
+    def __init__(self, repository, transform=None):
         self.repository = repository
         self.repository = repository
+        self.transform = transform or (lambda key, data: data)
 
 
     def close(self):
     def close(self):
         pass
         pass
@@ -1071,52 +1081,156 @@ class RepositoryNoCache:
         self.close()
         self.close()
 
 
     def get(self, key):
     def get(self, key):
-        return next(self.get_many([key]))
+        return next(self.get_many([key], cache=False))
 
 
-    def get_many(self, keys):
-        for data in self.repository.get_many(keys):
-            yield data
+    def get_many(self, keys, cache=True):
+        for key, data in zip(keys, self.repository.get_many(keys)):
+            yield self.transform(key, data)
 
 
 
 
 class RepositoryCache(RepositoryNoCache):
 class RepositoryCache(RepositoryNoCache):
-    """A caching Repository wrapper
+    """
+    A caching Repository wrapper.
 
 
-    Caches Repository GET operations using a local temporary Repository.
+    Caches Repository GET operations locally.
+
+    *pack* and *unpack* complement *transform* of the base class.
+    *pack* receives the output of *transform* and should return bytes,
+    which are stored in the cache. *unpack* receives these bytes and
+    should return the initial data (as returned by *transform*).
     """
     """
-    # maximum object size that will be cached, 64 kiB.
-    THRESHOLD = 2**16
 
 
-    def __init__(self, repository):
-        super().__init__(repository)
-        tmppath = tempfile.mkdtemp(prefix='borg-tmp')
-        self.caching_repo = Repository(tmppath, create=True, exclusive=True)
-        self.caching_repo.__enter__()  # handled by context manager in base class
+    def __init__(self, repository, pack=None, unpack=None, transform=None):
+        super().__init__(repository, transform)
+        self.pack = pack or (lambda data: data)
+        self.unpack = unpack or (lambda data: data)
+        self.cache = set()
+        self.basedir = tempfile.mkdtemp(prefix='borg-cache-')
+        self.query_size_limit()
+        self.size = 0
+        # Instrumentation
+        self.hits = 0
+        self.misses = 0
+        self.slow_misses = 0
+        self.slow_lat = 0.0
+        self.evictions = 0
+        self.enospc = 0
+
+    def query_size_limit(self):
+        stat_fs = os.statvfs(self.basedir)
+        available_space = stat_fs.f_bsize * stat_fs.f_bavail
+        self.size_limit = int(min(available_space * 0.25, 2**31))
+
+    def key_filename(self, key):
+        return os.path.join(self.basedir, bin_to_hex(key))
+
+    def backoff(self):
+        self.query_size_limit()
+        target_size = int(0.9 * self.size_limit)
+        while self.size > target_size and self.cache:
+            key = self.cache.pop()
+            file = self.key_filename(key)
+            self.size -= os.stat(file).st_size
+            os.unlink(file)
+            self.evictions += 1
+
+    def add_entry(self, key, data, cache):
+        transformed = self.transform(key, data)
+        if not cache:
+            return transformed
+        packed = self.pack(transformed)
+        file = self.key_filename(key)
+        try:
+            with open(file, 'wb') as fd:
+                fd.write(packed)
+        except OSError as os_error:
+            if os_error.errno == errno.ENOSPC:
+                self.enospc += 1
+                self.backoff()
+            else:
+                raise
+        else:
+            self.size += len(packed)
+            self.cache.add(key)
+            if self.size > self.size_limit:
+                self.backoff()
+        return transformed
 
 
     def close(self):
     def close(self):
-        if self.caching_repo is not None:
-            self.caching_repo.destroy()
-            self.caching_repo = None
-
-    def get_many(self, keys):
-        unknown_keys = [key for key in keys if key not in self.caching_repo]
+        logger.debug('RepositoryCache: current items %d, size %s / %s, %d hits, %d misses, %d slow misses (+%.1fs), '
+                     '%d evictions, %d ENOSPC hit',
+                     len(self.cache), format_file_size(self.size), format_file_size(self.size_limit),
+                     self.hits, self.misses, self.slow_misses, self.slow_lat,
+                     self.evictions, self.enospc)
+        self.cache.clear()
+        shutil.rmtree(self.basedir)
+
+    def get_many(self, keys, cache=True):
+        unknown_keys = [key for key in keys if key not in self.cache]
         repository_iterator = zip(unknown_keys, self.repository.get_many(unknown_keys))
         repository_iterator = zip(unknown_keys, self.repository.get_many(unknown_keys))
         for key in keys:
         for key in keys:
-            try:
-                yield self.caching_repo.get(key)
-            except Repository.ObjectNotFound:
+            if key in self.cache:
+                file = self.key_filename(key)
+                with open(file, 'rb') as fd:
+                    self.hits += 1
+                    yield self.unpack(fd.read())
+            else:
                 for key_, data in repository_iterator:
                 for key_, data in repository_iterator:
                     if key_ == key:
                     if key_ == key:
-                        if len(data) <= self.THRESHOLD:
-                            self.caching_repo.put(key, data)
-                        yield data
+                        transformed = self.add_entry(key, data, cache)
+                        self.misses += 1
+                        yield transformed
                         break
                         break
+                else:
+                    # slow path: eviction during this get_many removed this key from the cache
+                    t0 = time.perf_counter()
+                    data = self.repository.get(key)
+                    self.slow_lat += time.perf_counter() - t0
+                    transformed = self.add_entry(key, data, cache)
+                    self.slow_misses += 1
+                    yield transformed
         # Consume any pending requests
         # Consume any pending requests
         for _ in repository_iterator:
         for _ in repository_iterator:
             pass
             pass
 
 
 
 
-def cache_if_remote(repository):
-    if isinstance(repository, RemoteRepository):
-        return RepositoryCache(repository)
+def cache_if_remote(repository, *, decrypted_cache=False, pack=None, unpack=None, transform=None, force_cache=False):
+    """
+    Return a Repository(No)Cache for *repository*.
+
+    If *decrypted_cache* is a key object, then get and get_many will return a tuple
+    (csize, plaintext) instead of the actual data in the repository. The cache will
+    store decrypted data, which increases CPU efficiency (by avoiding repeatedly decrypting
+    and more importantly MAC and ID checking cached objects).
+    Internally, objects are compressed with LZ4.
+    """
+    if decrypted_cache and (pack or unpack or transform):
+        raise ValueError('decrypted_cache and pack/unpack/transform are incompatible')
+    elif decrypted_cache:
+        key = decrypted_cache
+        # 32 bit csize, 64 bit (8 byte) xxh64
+        cache_struct = struct.Struct('=I8s')
+        compressor = LZ4()
+
+        def pack(data):
+            csize, decrypted = data
+            compressed = compressor.compress(decrypted)
+            return cache_struct.pack(csize, xxh64(compressed)) + compressed
+
+        def unpack(data):
+            data = memoryview(data)
+            csize, checksum = cache_struct.unpack(data[:cache_struct.size])
+            compressed = data[cache_struct.size:]
+            if checksum != xxh64(compressed):
+                raise IntegrityError('detected corrupted data in metadata cache')
+            return csize, compressor.decompress(compressed)
+
+        def transform(id_, data):
+            csize = len(data)
+            decrypted = key.decrypt(id_, data)
+            return csize, decrypted
+
+    if isinstance(repository, RemoteRepository) or force_cache:
+        return RepositoryCache(repository, pack, unpack, transform)
     else:
     else:
-        return RepositoryNoCache(repository)
+        return RepositoryNoCache(repository, transform)

+ 198 - 0
src/borg/testsuite/cache.py

@@ -0,0 +1,198 @@
+import io
+
+from msgpack import packb
+
+import pytest
+
+from ..hashindex import ChunkIndex, CacheSynchronizer
+from .hashindex import H
+
+
+class TestCacheSynchronizer:
+    @pytest.fixture
+    def index(self):
+        return ChunkIndex()
+
+    @pytest.fixture
+    def sync(self, index):
+        return CacheSynchronizer(index)
+
+    def test_no_chunks(self, index, sync):
+        data = packb({
+            'foo': 'bar',
+            'baz': 1234,
+            'bar': 5678,
+            'user': 'chunks',
+            'chunks': []
+        })
+        sync.feed(data)
+        assert not len(index)
+
+    def test_simple(self, index, sync):
+        data = packb({
+            'foo': 'bar',
+            'baz': 1234,
+            'bar': 5678,
+            'user': 'chunks',
+            'chunks': [
+                (H(1), 1, 2),
+                (H(2), 2, 3),
+            ]
+        })
+        sync.feed(data)
+        assert len(index) == 2
+        assert index[H(1)] == (1, 1, 2)
+        assert index[H(2)] == (1, 2, 3)
+
+    def test_multiple(self, index, sync):
+        data = packb({
+            'foo': 'bar',
+            'baz': 1234,
+            'bar': 5678,
+            'user': 'chunks',
+            'chunks': [
+                (H(1), 1, 2),
+                (H(2), 2, 3),
+            ]
+        })
+        data += packb({
+            'xattrs': {
+                'security.foo': 'bar',
+                'chunks': '123456',
+            },
+            'stuff': [
+                (1, 2, 3),
+            ]
+        })
+        data += packb({
+            'xattrs': {
+                'security.foo': 'bar',
+                'chunks': '123456',
+            },
+            'chunks': [
+                (H(1), 1, 2),
+                (H(2), 2, 3),
+            ],
+            'stuff': [
+                (1, 2, 3),
+            ]
+        })
+        data += packb({
+            'chunks': [
+                (H(3), 1, 2),
+            ],
+        })
+        data += packb({
+            'chunks': [
+                (H(1), 1, 2),
+            ],
+        })
+
+        part1 = data[:70]
+        part2 = data[70:120]
+        part3 = data[120:]
+        sync.feed(part1)
+        sync.feed(part2)
+        sync.feed(part3)
+        assert len(index) == 3
+        assert index[H(1)] == (3, 1, 2)
+        assert index[H(2)] == (2, 2, 3)
+        assert index[H(3)] == (1, 1, 2)
+
+    @pytest.mark.parametrize('elem,error', (
+        ({1: 2}, 'Unexpected object: map'),
+        (bytes(213), [
+            'Unexpected bytes in chunks structure',  # structure 2/3
+            'Incorrect key length']),                # structure 3/3
+        (1, 'Unexpected object: integer'),
+        (1.0, 'Unexpected object: double'),
+        (True, 'Unexpected object: true'),
+        (False, 'Unexpected object: false'),
+        (None, 'Unexpected object: nil'),
+    ))
+    @pytest.mark.parametrize('structure', (
+        lambda elem: {'chunks': elem},
+        lambda elem: {'chunks': [elem]},
+        lambda elem: {'chunks': [(elem, 1, 2)]},
+    ))
+    def test_corrupted(self, sync, structure, elem, error):
+        packed = packb(structure(elem))
+        with pytest.raises(ValueError) as excinfo:
+            sync.feed(packed)
+        if isinstance(error, str):
+            error = [error]
+        possible_errors = ['cache_sync_feed failed: ' + error for error in error]
+        assert str(excinfo.value) in possible_errors
+
+    @pytest.mark.parametrize('data,error', (
+        # Incorrect tuple length
+        ({'chunks': [(bytes(32), 2, 3, 4)]}, 'Invalid chunk list entry length'),
+        ({'chunks': [(bytes(32), 2)]}, 'Invalid chunk list entry length'),
+        # Incorrect types
+        ({'chunks': [(1, 2, 3)]}, 'Unexpected object: integer'),
+        ({'chunks': [(1, bytes(32), 2)]}, 'Unexpected object: integer'),
+        ({'chunks': [(bytes(32), 1.0, 2)]}, 'Unexpected object: double'),
+    ))
+    def test_corrupted_ancillary(self, index, sync, data, error):
+        packed = packb(data)
+        with pytest.raises(ValueError) as excinfo:
+            sync.feed(packed)
+        assert str(excinfo.value) == 'cache_sync_feed failed: ' + error
+
+    def make_index_with_refcount(self, refcount):
+        index_data = io.BytesIO()
+        index_data.write(b'BORG_IDX')
+        # num_entries
+        index_data.write((1).to_bytes(4, 'little'))
+        # num_buckets
+        index_data.write((1).to_bytes(4, 'little'))
+        # key_size
+        index_data.write((32).to_bytes(1, 'little'))
+        # value_size
+        index_data.write((3 * 4).to_bytes(1, 'little'))
+
+        index_data.write(H(0))
+        index_data.write(refcount.to_bytes(4, 'little'))
+        index_data.write((1234).to_bytes(4, 'little'))
+        index_data.write((5678).to_bytes(4, 'little'))
+
+        index_data.seek(0)
+        index = ChunkIndex.read(index_data)
+        return index
+
+    def test_corrupted_refcount(self):
+        index = self.make_index_with_refcount(ChunkIndex.MAX_VALUE + 1)
+        sync = CacheSynchronizer(index)
+        data = packb({
+            'chunks': [
+                (H(0), 1, 2),
+            ]
+        })
+        with pytest.raises(ValueError) as excinfo:
+            sync.feed(data)
+        assert str(excinfo.value) == 'cache_sync_feed failed: invalid reference count'
+
+    def test_refcount_max_value(self):
+        index = self.make_index_with_refcount(ChunkIndex.MAX_VALUE)
+        sync = CacheSynchronizer(index)
+        data = packb({
+            'chunks': [
+                (H(0), 1, 2),
+            ]
+        })
+        sync.feed(data)
+        assert index[H(0)] == (ChunkIndex.MAX_VALUE, 1234, 5678)
+
+    def test_refcount_one_below_max_value(self):
+        index = self.make_index_with_refcount(ChunkIndex.MAX_VALUE - 1)
+        sync = CacheSynchronizer(index)
+        data = packb({
+            'chunks': [
+                (H(0), 1, 2),
+            ]
+        })
+        sync.feed(data)
+        # Incremented to maximum
+        assert index[H(0)] == (ChunkIndex.MAX_VALUE, 1234, 5678)
+        sync.feed(data)
+        assert index[H(0)] == (ChunkIndex.MAX_VALUE, 1234, 5678)

+ 139 - 1
src/borg/testsuite/remote.py

@@ -1,9 +1,18 @@
+import errno
 import os
 import os
+import io
 import time
 import time
+from unittest.mock import patch
 
 
 import pytest
 import pytest
 
 
-from ..remote import SleepingBandwidthLimiter
+from ..remote import SleepingBandwidthLimiter, RepositoryCache, cache_if_remote
+from ..repository import Repository
+from ..crypto.key import PlaintextKey
+from ..compress import CompressionSpec
+from ..helpers import IntegrityError
+from .hashindex import H
+from .key import TestKey
 
 
 
 
 class TestSleepingBandwidthLimiter:
 class TestSleepingBandwidthLimiter:
@@ -58,3 +67,132 @@ class TestSleepingBandwidthLimiter:
         now += 10
         now += 10
         self.expect_write(5, b"1")
         self.expect_write(5, b"1")
         it.write(5, b"1")
         it.write(5, b"1")
+
+
+class TestRepositoryCache:
+    @pytest.yield_fixture
+    def repository(self, tmpdir):
+        self.repository_location = os.path.join(str(tmpdir), 'repository')
+        with Repository(self.repository_location, exclusive=True, create=True) as repository:
+            repository.put(H(1), b'1234')
+            repository.put(H(2), b'5678')
+            repository.put(H(3), bytes(100))
+            yield repository
+
+    @pytest.fixture
+    def cache(self, repository):
+        return RepositoryCache(repository)
+
+    def test_simple(self, cache: RepositoryCache):
+        # Single get()s are not cached, since they are used for unique objects like archives.
+        assert cache.get(H(1)) == b'1234'
+        assert cache.misses == 1
+        assert cache.hits == 0
+
+        assert list(cache.get_many([H(1)])) == [b'1234']
+        assert cache.misses == 2
+        assert cache.hits == 0
+
+        assert list(cache.get_many([H(1)])) == [b'1234']
+        assert cache.misses == 2
+        assert cache.hits == 1
+
+        assert cache.get(H(1)) == b'1234'
+        assert cache.misses == 2
+        assert cache.hits == 2
+
+    def test_backoff(self, cache: RepositoryCache):
+        def query_size_limit():
+            cache.size_limit = 0
+
+        assert list(cache.get_many([H(1), H(2)])) == [b'1234', b'5678']
+        assert cache.misses == 2
+        assert cache.evictions == 0
+        iterator = cache.get_many([H(1), H(3), H(2)])
+        assert next(iterator) == b'1234'
+
+        # Force cache to back off
+        qsl = cache.query_size_limit
+        cache.query_size_limit = query_size_limit
+        cache.backoff()
+        cache.query_size_limit = qsl
+        # Evicted H(1) and H(2)
+        assert cache.evictions == 2
+        assert H(1) not in cache.cache
+        assert H(2) not in cache.cache
+        assert next(iterator) == bytes(100)
+        assert cache.slow_misses == 0
+        # Since H(2) was in the cache when we called get_many(), but has
+        # been evicted during iterating the generator, it will be a slow miss.
+        assert next(iterator) == b'5678'
+        assert cache.slow_misses == 1
+
+    def test_enospc(self, cache: RepositoryCache):
+        class enospc_open:
+            def __init__(self, *args):
+                pass
+
+            def __enter__(self):
+                return self
+
+            def __exit__(self, exc_type, exc_val, exc_tb):
+                pass
+
+            def write(self, data):
+                raise OSError(errno.ENOSPC, 'foo')
+
+        iterator = cache.get_many([H(1), H(2), H(3)])
+        assert next(iterator) == b'1234'
+
+        with patch('builtins.open', enospc_open):
+            assert next(iterator) == b'5678'
+            assert cache.enospc == 1
+            # We didn't patch query_size_limit which would set size_limit to some low
+            # value, so nothing was actually evicted.
+            assert cache.evictions == 0
+
+        assert next(iterator) == bytes(100)
+
+    @pytest.fixture
+    def key(self, repository, monkeypatch):
+        monkeypatch.setenv('BORG_PASSPHRASE', 'test')
+        key = PlaintextKey.create(repository, TestKey.MockArgs())
+        key.compressor = CompressionSpec('none').compressor
+        return key
+
+    def _put_encrypted_object(self, key, repository, data):
+        id_ = key.id_hash(data)
+        repository.put(id_, key.encrypt(data))
+        return id_
+
+    @pytest.fixture
+    def H1(self, key, repository):
+        return self._put_encrypted_object(key, repository, b'1234')
+
+    @pytest.fixture
+    def H2(self, key, repository):
+        return self._put_encrypted_object(key, repository, b'5678')
+
+    @pytest.fixture
+    def H3(self, key, repository):
+        return self._put_encrypted_object(key, repository, bytes(100))
+
+    @pytest.fixture
+    def decrypted_cache(self, key, repository):
+        return cache_if_remote(repository, decrypted_cache=key, force_cache=True)
+
+    def test_cache_corruption(self, decrypted_cache: RepositoryCache, H1, H2, H3):
+        list(decrypted_cache.get_many([H1, H2, H3]))
+
+        iterator = decrypted_cache.get_many([H1, H2, H3])
+        assert next(iterator) == (7, b'1234')
+
+        with open(decrypted_cache.key_filename(H2), 'a+b') as fd:
+            fd.seek(-1, io.SEEK_END)
+            corrupted = (int.from_bytes(fd.read(), 'little') ^ 2).to_bytes(1, 'little')
+            fd.seek(-1, io.SEEK_END)
+            fd.write(corrupted)
+            fd.truncate()
+
+        with pytest.raises(IntegrityError):
+            assert next(iterator) == (7, b'5678')