Browse Source

cache sync/remote: compressed, decrypted cache

Marian Beermann 8 years ago
parent
commit
835b0e5ee0
2 changed files with 70 additions and 29 deletions
  1. 8 10
      src/borg/cache.py
  2. 62 19
      src/borg/remote.py

+ 8 - 10
src/borg/cache.py

@@ -564,17 +564,15 @@ Chunk index:    {0.total_unique_chunks:20d} {0.total_chunks:20d}"""
             except FileNotFoundError:
             except FileNotFoundError:
                 pass
                 pass
 
 
-        def fetch_and_build_idx(archive_id, repository, key, chunk_idx):
-            cdata = repository.get(archive_id)
-            data = key.decrypt(archive_id, cdata)
-            chunk_idx.add(archive_id, 1, len(data), len(cdata))
+        def fetch_and_build_idx(archive_id, decrypted_repository, key, chunk_idx):
+            csize, data = decrypted_repository.get(archive_id)
+            chunk_idx.add(archive_id, 1, len(data), csize)
             archive = ArchiveItem(internal_dict=msgpack.unpackb(data))
             archive = ArchiveItem(internal_dict=msgpack.unpackb(data))
             if archive.version != 1:
             if archive.version != 1:
                 raise Exception('Unknown archive metadata version')
                 raise Exception('Unknown archive metadata version')
             sync = CacheSynchronizer(chunk_idx)
             sync = CacheSynchronizer(chunk_idx)
-            for item_id, chunk in zip(archive.items, repository.get_many(archive.items)):
-                data = key.decrypt(item_id, chunk)
-                chunk_idx.add(item_id, 1, len(data), len(chunk))
+            for item_id, (csize, data) in zip(archive.items, decrypted_repository.get_many(archive.items)):
+                chunk_idx.add(item_id, 1, len(data), csize)
                 sync.feed(data)
                 sync.feed(data)
             if self.do_cache:
             if self.do_cache:
                 fn = mkpath(archive_id)
                 fn = mkpath(archive_id)
@@ -641,7 +639,7 @@ Chunk index:    {0.total_unique_chunks:20d} {0.total_chunks:20d}"""
                             # above can remove *archive_id* from *cached_ids*.
                             # above can remove *archive_id* from *cached_ids*.
                             logger.info('Fetching and building archive index for %s ...', archive_name)
                             logger.info('Fetching and building archive index for %s ...', archive_name)
                             archive_chunk_idx = ChunkIndex()
                             archive_chunk_idx = ChunkIndex()
-                            fetch_and_build_idx(archive_id, repository, self.key, archive_chunk_idx)
+                            fetch_and_build_idx(archive_id, decrypted_repository, self.key, archive_chunk_idx)
                         logger.info("Merging into master chunks index ...")
                         logger.info("Merging into master chunks index ...")
                         if chunk_idx is None:
                         if chunk_idx is None:
                             # we just use the first archive's idx as starting point,
                             # we just use the first archive's idx as starting point,
@@ -653,7 +651,7 @@ Chunk index:    {0.total_unique_chunks:20d} {0.total_chunks:20d}"""
                     else:
                     else:
                         chunk_idx = chunk_idx or ChunkIndex(master_index_capacity)
                         chunk_idx = chunk_idx or ChunkIndex(master_index_capacity)
                         logger.info('Fetching archive index for %s ...', archive_name)
                         logger.info('Fetching archive index for %s ...', archive_name)
-                        fetch_and_build_idx(archive_id, repository, self.key, chunk_idx)
+                        fetch_and_build_idx(archive_id, decrypted_repository, self.key, chunk_idx)
                 if self.progress:
                 if self.progress:
                     pi.finish()
                     pi.finish()
             logger.info('Done.')
             logger.info('Done.')
@@ -675,7 +673,7 @@ Chunk index:    {0.total_unique_chunks:20d} {0.total_chunks:20d}"""
                 pass
                 pass
 
 
         self.begin_txn()
         self.begin_txn()
-        with cache_if_remote(self.repository) as repository:
+        with cache_if_remote(self.repository, decrypted_cache=self.key) as decrypted_repository:
             legacy_cleanup()
             legacy_cleanup()
             # TEMPORARY HACK: to avoid archive index caching, create a FILE named ~/.cache/borg/REPOID/chunks.archive.d -
             # TEMPORARY HACK: to avoid archive index caching, create a FILE named ~/.cache/borg/REPOID/chunks.archive.d -
             # this is only recommended if you have a fast, low latency connection to your repo (e.g. if repo is local disk)
             # this is only recommended if you have a fast, low latency connection to your repo (e.g. if repo is local disk)

+ 62 - 19
src/borg/remote.py

@@ -8,6 +8,7 @@ import os
 import select
 import select
 import shlex
 import shlex
 import shutil
 import shutil
+import struct
 import sys
 import sys
 import tempfile
 import tempfile
 import textwrap
 import textwrap
@@ -18,6 +19,7 @@ from subprocess import Popen, PIPE
 import msgpack
 import msgpack
 
 
 from . import __version__
 from . import __version__
+from .compress import LZ4
 from .helpers import Error, IntegrityError
 from .helpers import Error, IntegrityError
 from .helpers import bin_to_hex
 from .helpers import bin_to_hex
 from .helpers import get_home_dir
 from .helpers import get_home_dir
@@ -1046,9 +1048,14 @@ class RepositoryNoCache:
     """A not caching Repository wrapper, passes through to repository.
     """A not caching Repository wrapper, passes through to repository.
 
 
     Just to have same API (including the context manager) as RepositoryCache.
     Just to have same API (including the context manager) as RepositoryCache.
+
+    *transform* is a callable taking two arguments, key and raw repository data.
+    The return value is returned from get()/get_many(). By default, the raw
+    repository data is returned.
     """
     """
-    def __init__(self, repository):
+    def __init__(self, repository, transform=None):
         self.repository = repository
         self.repository = repository
+        self.transform = transform or (lambda key, data: data)
 
 
     def close(self):
     def close(self):
         pass
         pass
@@ -1063,8 +1070,8 @@ class RepositoryNoCache:
         return next(self.get_many([key], cache=False))
         return next(self.get_many([key], cache=False))
 
 
     def get_many(self, keys, cache=True):
     def get_many(self, keys, cache=True):
-        for data in self.repository.get_many(keys):
-            yield data
+        for key, data in zip(keys, self.repository.get_many(keys)):
+            yield self.transform(key, data)
 
 
 
 
 class RepositoryCache(RepositoryNoCache):
 class RepositoryCache(RepositoryNoCache):
@@ -1072,10 +1079,17 @@ class RepositoryCache(RepositoryNoCache):
     A caching Repository wrapper.
     A caching Repository wrapper.
 
 
     Caches Repository GET operations locally.
     Caches Repository GET operations locally.
+
+    *pack* and *unpack* complement *transform* of the base class.
+    *pack* receives the output of *transform* and should return bytes,
+    which are stored in the cache. *unpack* receives these bytes and
+    should return the initial data (as returned by *transform*).
     """
     """
 
 
-    def __init__(self, repository):
-        super().__init__(repository)
+    def __init__(self, repository, pack=None, unpack=None, transform=None):
+        super().__init__(repository, transform)
+        self.pack = pack or (lambda data: data)
+        self.unpack = unpack or (lambda data: data)
         self.cache = set()
         self.cache = set()
         self.basedir = tempfile.mkdtemp(prefix='borg-cache-')
         self.basedir = tempfile.mkdtemp(prefix='borg-cache-')
         self.query_size_limit()
         self.query_size_limit()
@@ -1106,11 +1120,15 @@ class RepositoryCache(RepositoryNoCache):
             os.unlink(file)
             os.unlink(file)
             self.evictions += 1
             self.evictions += 1
 
 
-    def add_entry(self, key, data):
+    def add_entry(self, key, data, cache):
+        transformed = self.transform(key, data)
+        if not cache:
+            return transformed
+        packed = self.pack(transformed)
         file = self.key_filename(key)
         file = self.key_filename(key)
         try:
         try:
             with open(file, 'wb') as fd:
             with open(file, 'wb') as fd:
-                fd.write(data)
+                fd.write(packed)
         except OSError as os_error:
         except OSError as os_error:
             if os_error.errno == errno.ENOSPC:
             if os_error.errno == errno.ENOSPC:
                 self.enospc += 1
                 self.enospc += 1
@@ -1118,11 +1136,11 @@ class RepositoryCache(RepositoryNoCache):
             else:
             else:
                 raise
                 raise
         else:
         else:
-            self.size += len(data)
+            self.size += len(packed)
             self.cache.add(key)
             self.cache.add(key)
             if self.size > self.size_limit:
             if self.size > self.size_limit:
                 self.backoff()
                 self.backoff()
-        return data
+        return transformed
 
 
     def close(self):
     def close(self):
         logger.debug('RepositoryCache: current items %d, size %s / %s, %d hits, %d misses, %d slow misses (+%.1fs), '
         logger.debug('RepositoryCache: current items %d, size %s / %s, %d hits, %d misses, %d slow misses (+%.1fs), '
@@ -1141,31 +1159,56 @@ class RepositoryCache(RepositoryNoCache):
                 file = self.key_filename(key)
                 file = self.key_filename(key)
                 with open(file, 'rb') as fd:
                 with open(file, 'rb') as fd:
                     self.hits += 1
                     self.hits += 1
-                    yield fd.read()
+                    yield self.unpack(fd.read())
             else:
             else:
                 for key_, data in repository_iterator:
                 for key_, data in repository_iterator:
                     if key_ == key:
                     if key_ == key:
-                        if cache:
-                            self.add_entry(key, data)
+                        transformed = self.add_entry(key, data, cache)
                         self.misses += 1
                         self.misses += 1
-                        yield data
+                        yield transformed
                         break
                         break
                 else:
                 else:
                     # slow path: eviction during this get_many removed this key from the cache
                     # slow path: eviction during this get_many removed this key from the cache
                     t0 = time.perf_counter()
                     t0 = time.perf_counter()
                     data = self.repository.get(key)
                     data = self.repository.get(key)
                     self.slow_lat += time.perf_counter() - t0
                     self.slow_lat += time.perf_counter() - t0
-                    if cache:
-                        self.add_entry(key, data)
+                    transformed = self.add_entry(key, data, cache)
                     self.slow_misses += 1
                     self.slow_misses += 1
-                    yield data
+                    yield transformed
         # Consume any pending requests
         # Consume any pending requests
         for _ in repository_iterator:
         for _ in repository_iterator:
             pass
             pass
 
 
 
 
-def cache_if_remote(repository):
+def cache_if_remote(repository, *, decrypted_cache=False, pack=None, unpack=None, transform=None):
+    """
+    Return a Repository(No)Cache for *repository*.
+
+    If *decrypted_cache* is a key object, then get and get_many will return a tuple
+    (csize, plaintext) instead of the actual data in the repository. The cache will
+    store decrypted data, which increases CPU efficiency (by avoiding repeatedly decrypting
+    and more importantly MAC and ID checking cached objects).
+    Internally, objects are compressed with LZ4.
+    """
+    if decrypted_cache and (pack or unpack or transform):
+        raise ValueError('decrypted_cache and pack/unpack/transform are incompatible')
+    elif decrypted_cache:
+        key = decrypted_cache
+        cache_struct = struct.Struct('=I')
+        compressor = LZ4()
+
+        def pack(data):
+            return cache_struct.pack(data[0]) + compressor.compress(data[1])
+
+        def unpack(data):
+            return cache_struct.unpack(data[:cache_struct.size])[0], compressor.decompress(data[cache_struct.size:])
+
+        def transform(id_, data):
+            csize = len(data)
+            decrypted = key.decrypt(id_, data)
+            return csize, decrypted
+
     if isinstance(repository, RemoteRepository):
     if isinstance(repository, RemoteRepository):
-        return RepositoryCache(repository)
+        return RepositoryCache(repository, pack, unpack, transform)
     else:
     else:
-        return RepositoryNoCache(repository)
+        return RepositoryNoCache(repository, transform)