|
@@ -7,6 +7,7 @@ import logging
|
|
|
import os
|
|
|
import select
|
|
|
import shlex
|
|
|
+import shutil
|
|
|
import sys
|
|
|
import tempfile
|
|
|
import textwrap
|
|
@@ -23,6 +24,7 @@ from .helpers import get_home_dir
|
|
|
from .helpers import hostname_is_unique
|
|
|
from .helpers import replace_placeholders
|
|
|
from .helpers import sysinfo
|
|
|
+from .helpers import format_file_size
|
|
|
from .logger import create_logger, setup_logging
|
|
|
from .repository import Repository, MAX_OBJECT_SIZE, LIST_SCAN_LIMIT
|
|
|
from .version import parse_version, format_version
|
|
@@ -1058,45 +1060,105 @@ class RepositoryNoCache:
|
|
|
self.close()
|
|
|
|
|
|
def get(self, key):
|
|
|
- return next(self.get_many([key]))
|
|
|
+ return next(self.get_many([key], cache=False))
|
|
|
|
|
|
- def get_many(self, keys):
|
|
|
+ def get_many(self, keys, cache=True):
|
|
|
for data in self.repository.get_many(keys):
|
|
|
yield data
|
|
|
|
|
|
|
|
|
class RepositoryCache(RepositoryNoCache):
|
|
|
- """A caching Repository wrapper
|
|
|
+ """
|
|
|
+ A caching Repository wrapper.
|
|
|
|
|
|
- Caches Repository GET operations using a local temporary Repository.
|
|
|
+ Caches Repository GET operations locally.
|
|
|
"""
|
|
|
- # maximum object size that will be cached, 64 kiB.
|
|
|
- THRESHOLD = 2**16
|
|
|
|
|
|
def __init__(self, repository):
|
|
|
super().__init__(repository)
|
|
|
- tmppath = tempfile.mkdtemp(prefix='borg-tmp')
|
|
|
- self.caching_repo = Repository(tmppath, create=True, exclusive=True)
|
|
|
- self.caching_repo.__enter__() # handled by context manager in base class
|
|
|
+ self.cache = set()
|
|
|
+ self.basedir = tempfile.mkdtemp(prefix='borg-cache-')
|
|
|
+ self.query_size_limit()
|
|
|
+ self.size = 0
|
|
|
+ # Instrumentation
|
|
|
+ self.hits = 0
|
|
|
+ self.misses = 0
|
|
|
+ self.slow_misses = 0
|
|
|
+ self.slow_lat = 0.0
|
|
|
+ self.evictions = 0
|
|
|
+ self.enospc = 0
|
|
|
+
|
|
|
+ def query_size_limit(self):
|
|
|
+ stat_fs = os.statvfs(self.basedir)
|
|
|
+ available_space = stat_fs.f_bsize * stat_fs.f_bavail
|
|
|
+ self.size_limit = int(min(available_space * 0.25, 2**31))
|
|
|
+
|
|
|
+ def key_filename(self, key):
|
|
|
+ return os.path.join(self.basedir, bin_to_hex(key))
|
|
|
+
|
|
|
+ def backoff(self):
|
|
|
+ self.query_size_limit()
|
|
|
+ target_size = int(0.9 * self.size_limit)
|
|
|
+ while self.size > target_size and self.cache:
|
|
|
+ key = self.cache.pop()
|
|
|
+ file = self.key_filename(key)
|
|
|
+ self.size -= os.stat(file).st_size
|
|
|
+ os.unlink(file)
|
|
|
+ self.evictions += 1
|
|
|
+
|
|
|
+ def add_entry(self, key, data):
|
|
|
+ file = self.key_filename(key)
|
|
|
+ try:
|
|
|
+ with open(file, 'wb') as fd:
|
|
|
+ fd.write(data)
|
|
|
+ except OSError as os_error:
|
|
|
+ if os_error.errno == errno.ENOSPC:
|
|
|
+ self.enospc += 1
|
|
|
+ self.backoff()
|
|
|
+ else:
|
|
|
+ raise
|
|
|
+ else:
|
|
|
+ self.size += len(data)
|
|
|
+ self.cache.add(key)
|
|
|
+ if self.size > self.size_limit:
|
|
|
+ self.backoff()
|
|
|
+ return data
|
|
|
|
|
|
def close(self):
|
|
|
- if self.caching_repo is not None:
|
|
|
- self.caching_repo.destroy()
|
|
|
- self.caching_repo = None
|
|
|
-
|
|
|
- def get_many(self, keys):
|
|
|
- unknown_keys = [key for key in keys if key not in self.caching_repo]
|
|
|
+ logger.debug('RepositoryCache: current items %d, size %s / %s, %d hits, %d misses, %d slow misses (+%.1fs), '
|
|
|
+ '%d evictions, %d ENOSPC hit',
|
|
|
+ len(self.cache), format_file_size(self.size), format_file_size(self.size_limit),
|
|
|
+ self.hits, self.misses, self.slow_misses, self.slow_lat,
|
|
|
+ self.evictions, self.enospc)
|
|
|
+ self.cache.clear()
|
|
|
+ shutil.rmtree(self.basedir)
|
|
|
+
|
|
|
+ def get_many(self, keys, cache=True):
|
|
|
+ unknown_keys = [key for key in keys if key not in self.cache]
|
|
|
repository_iterator = zip(unknown_keys, self.repository.get_many(unknown_keys))
|
|
|
for key in keys:
|
|
|
- try:
|
|
|
- yield self.caching_repo.get(key)
|
|
|
- except Repository.ObjectNotFound:
|
|
|
+ if key in self.cache:
|
|
|
+ file = self.key_filename(key)
|
|
|
+ with open(file, 'rb') as fd:
|
|
|
+ self.hits += 1
|
|
|
+ yield fd.read()
|
|
|
+ else:
|
|
|
for key_, data in repository_iterator:
|
|
|
if key_ == key:
|
|
|
- if len(data) <= self.THRESHOLD:
|
|
|
- self.caching_repo.put(key, data)
|
|
|
+ if cache:
|
|
|
+ self.add_entry(key, data)
|
|
|
+ self.misses += 1
|
|
|
yield data
|
|
|
break
|
|
|
+ else:
|
|
|
+ # slow path: eviction during this get_many removed this key from the cache
|
|
|
+ t0 = time.perf_counter()
|
|
|
+ data = self.repository.get(key)
|
|
|
+ self.slow_lat += time.perf_counter() - t0
|
|
|
+ if cache:
|
|
|
+ self.add_entry(key, data)
|
|
|
+ self.slow_misses += 1
|
|
|
+ yield data
|
|
|
# Consume any pending requests
|
|
|
for _ in repository_iterator:
|
|
|
pass
|