ソースを参照

Merge pull request #565 from ThomasWaldmann/repocache-ctxmgr

Repocache ctxmgr
TW 9 年 前
コミット
704b505968
5 ファイル変更88 行追加62 行削除
  1. 33 32
      borg/archive.py
  2. 14 13
      borg/archiver.py
  3. 6 6
      borg/cache.py
  4. 2 3
      borg/fuse.py
  5. 33 8
      borg/remote.py

+ 33 - 32
borg/archive.py

@@ -834,7 +834,6 @@ class ArchiveChecker:
                         raise
                     i += 1
 
-        repository = cache_if_remote(self.repository)
         if archive is None:
             # we need last N or all archives
             archive_items = sorted(self.manifest.archives.items(), reverse=True,
@@ -848,37 +847,39 @@ class ArchiveChecker:
             archive_items = [item for item in self.manifest.archives.items() if item[0] == archive]
             num_archives = 1
             end = 1
-        for i, (name, info) in enumerate(archive_items[:end]):
-            logger.info('Analyzing archive {} ({}/{})'.format(name, num_archives - i, num_archives))
-            archive_id = info[b'id']
-            if archive_id not in self.chunks:
-                logger.error('Archive metadata block is missing!')
-                self.error_found = True
-                del self.manifest.archives[name]
-                continue
-            mark_as_possibly_superseded(archive_id)
-            cdata = self.repository.get(archive_id)
-            data = self.key.decrypt(archive_id, cdata)
-            archive = StableDict(msgpack.unpackb(data))
-            if archive[b'version'] != 1:
-                raise Exception('Unknown archive metadata version')
-            decode_dict(archive, (b'name', b'hostname', b'username', b'time'))
-            archive[b'cmdline'] = [arg.decode('utf-8', 'surrogateescape') for arg in archive[b'cmdline']]
-            items_buffer = ChunkBuffer(self.key)
-            items_buffer.write_chunk = add_callback
-            for item in robust_iterator(archive):
-                if b'chunks' in item:
-                    verify_file_chunks(item)
-                items_buffer.add(item)
-            items_buffer.flush(flush=True)
-            for previous_item_id in archive[b'items']:
-                mark_as_possibly_superseded(previous_item_id)
-            archive[b'items'] = items_buffer.chunks
-            data = msgpack.packb(archive, unicode_errors='surrogateescape')
-            new_archive_id = self.key.id_hash(data)
-            cdata = self.key.encrypt(data)
-            add_reference(new_archive_id, len(data), len(cdata), cdata)
-            info[b'id'] = new_archive_id
+
+        with cache_if_remote(self.repository) as repository:
+            for i, (name, info) in enumerate(archive_items[:end]):
+                logger.info('Analyzing archive {} ({}/{})'.format(name, num_archives - i, num_archives))
+                archive_id = info[b'id']
+                if archive_id not in self.chunks:
+                    logger.error('Archive metadata block is missing!')
+                    self.error_found = True
+                    del self.manifest.archives[name]
+                    continue
+                mark_as_possibly_superseded(archive_id)
+                cdata = self.repository.get(archive_id)
+                data = self.key.decrypt(archive_id, cdata)
+                archive = StableDict(msgpack.unpackb(data))
+                if archive[b'version'] != 1:
+                    raise Exception('Unknown archive metadata version')
+                decode_dict(archive, (b'name', b'hostname', b'username', b'time'))
+                archive[b'cmdline'] = [arg.decode('utf-8', 'surrogateescape') for arg in archive[b'cmdline']]
+                items_buffer = ChunkBuffer(self.key)
+                items_buffer.write_chunk = add_callback
+                for item in robust_iterator(archive):
+                    if b'chunks' in item:
+                        verify_file_chunks(item)
+                    items_buffer.add(item)
+                items_buffer.flush(flush=True)
+                for previous_item_id in archive[b'items']:
+                    mark_as_possibly_superseded(previous_item_id)
+                archive[b'items'] = items_buffer.chunks
+                data = msgpack.packb(archive, unicode_errors='surrogateescape')
+                new_archive_id = self.key.id_hash(data)
+                cdata = self.key.encrypt(data)
+                add_reference(new_archive_id, len(data), len(cdata), cdata)
+                info[b'id'] = new_archive_id
 
     def orphan_chunks_check(self):
         if self.check_all:

+ 14 - 13
borg/archiver.py

@@ -30,7 +30,7 @@ from .repository import Repository
 from .cache import Cache
 from .key import key_creator
 from .archive import Archive, ArchiveChecker, CHUNKER_PARAMS
-from .remote import RepositoryServer, RemoteRepository
+from .remote import RepositoryServer, RemoteRepository, cache_if_remote
 
 has_lchflags = hasattr(os, 'lchflags')
 
@@ -380,18 +380,19 @@ class Archiver:
 
         repository = self.open_repository(args)
         try:
-            manifest, key = Manifest.load(repository)
-            if args.location.archive:
-                archive = Archive(repository, key, manifest, args.location.archive)
-            else:
-                archive = None
-            operations = FuseOperations(key, repository, manifest, archive)
-            logger.info("Mounting filesystem")
-            try:
-                operations.mount(args.mountpoint, args.options, args.foreground)
-            except RuntimeError:
-                # Relevant error message already printed to stderr by fuse
-                self.exit_code = EXIT_ERROR
+            with cache_if_remote(repository) as cached_repo:
+                manifest, key = Manifest.load(repository)
+                if args.location.archive:
+                    archive = Archive(repository, key, manifest, args.location.archive)
+                else:
+                    archive = None
+                operations = FuseOperations(key, repository, manifest, archive, cached_repo)
+                logger.info("Mounting filesystem")
+                try:
+                    operations.mount(args.mountpoint, args.options, args.foreground)
+                except RuntimeError:
+                    # Relevant error message already printed to stderr by fuse
+                    self.exit_code = EXIT_ERROR
         finally:
             repository.close()
         return self.exit_code

+ 6 - 6
borg/cache.py

@@ -340,12 +340,12 @@ Chunk index:    {0.total_unique_chunks:20d} {0.total_chunks:20d}"""
                 pass
 
         self.begin_txn()
-        repository = cache_if_remote(self.repository)
-        legacy_cleanup()
-        # TEMPORARY HACK: to avoid archive index caching, create a FILE named ~/.cache/borg/REPOID/chunks.archive.d -
-        # this is only recommended if you have a fast, low latency connection to your repo (e.g. if repo is local disk)
-        self.do_cache = os.path.isdir(archive_path)
-        self.chunks = create_master_idx(self.chunks)
+        with cache_if_remote(self.repository) as repository:
+            legacy_cleanup()
+            # TEMPORARY HACK: to avoid archive index caching, create a FILE named ~/.cache/borg/REPOID/chunks.archive.d -
+            # this is only recommended if you have a fast, low latency connection to your repo (e.g. if repo is local disk)
+            self.do_cache = os.path.isdir(archive_path)
+            self.chunks = create_master_idx(self.chunks)
 
     def add_chunk(self, id, data, stats):
         if not self.txn_active:

+ 2 - 3
borg/fuse.py

@@ -8,7 +8,6 @@ import tempfile
 import time
 from .archive import Archive
 from .helpers import daemonize
-from .remote import cache_if_remote
 
 import msgpack
 
@@ -34,11 +33,11 @@ class ItemCache:
 class FuseOperations(llfuse.Operations):
     """Export archive as a fuse filesystem
     """
-    def __init__(self, key, repository, manifest, archive):
+    def __init__(self, key, repository, manifest, archive, cached_repo):
         super().__init__()
         self._inode_count = 0
         self.key = key
-        self.repository = cache_if_remote(repository)
+        self.repository = cached_repo
         self.items = {}
         self.parent = {}
         self.contents = defaultdict(dict)

+ 33 - 8
borg/remote.py

@@ -359,22 +359,46 @@ class RemoteRepository:
         self.preload_ids += ids
 
 
-class RepositoryCache:
-    """A caching Repository wrapper
+class RepositoryNoCache:
+    """A not caching Repository wrapper, passes through to repository.
 
-    Caches Repository GET operations using a local temporary Repository.
+    Just to have same API (including the context manager) as RepositoryCache.
     """
     def __init__(self, repository):
         self.repository = repository
-        tmppath = tempfile.mkdtemp(prefix='borg-tmp')
-        self.caching_repo = Repository(tmppath, create=True, exclusive=True)
 
-    def __del__(self):
-        self.caching_repo.destroy()
+    def close(self):
+        pass
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, exc_type, exc_val, exc_tb):
+        self.close()
 
     def get(self, key):
         return next(self.get_many([key]))
 
+    def get_many(self, keys):
+        for data in self.repository.get_many(keys):
+            yield data
+
+
+class RepositoryCache(RepositoryNoCache):
+    """A caching Repository wrapper
+
+    Caches Repository GET operations using a local temporary Repository.
+    """
+    def __init__(self, repository):
+        super().__init__(repository)
+        tmppath = tempfile.mkdtemp(prefix='borg-tmp')
+        self.caching_repo = Repository(tmppath, create=True, exclusive=True)
+
+    def close(self):
+        if self.caching_repo is not None:
+            self.caching_repo.destroy()
+            self.caching_repo = None
+
     def get_many(self, keys):
         unknown_keys = [key for key in keys if key not in self.caching_repo]
         repository_iterator = zip(unknown_keys, self.repository.get_many(unknown_keys))
@@ -395,4 +419,5 @@ class RepositoryCache:
 def cache_if_remote(repository):
     if isinstance(repository, RemoteRepository):
         return RepositoryCache(repository)
-    return repository
+    else:
+        return RepositoryNoCache(repository)