Browse Source

implement and use context manager for RepositoryCache, fixes #548

Thomas Waldmann 9 years ago
parent
commit
4d73f3cdb9
4 changed files with 73 additions and 46 deletions
  1. 33 32
      borg/archive.py
  2. 6 6
      borg/cache.py
  3. 1 0
      borg/fuse.py
  4. 33 8
      borg/remote.py

+ 33 - 32
borg/archive.py

@@ -829,7 +829,6 @@ class ArchiveChecker:
                         raise
                     i += 1
 
-        repository = cache_if_remote(self.repository)
         if archive is None:
             # we need last N or all archives
             archive_items = sorted(self.manifest.archives.items(), reverse=True,
@@ -843,37 +842,39 @@ class ArchiveChecker:
             archive_items = [item for item in self.manifest.archives.items() if item[0] == archive]
             num_archives = 1
             end = 1
-        for i, (name, info) in enumerate(archive_items[:end]):
-            logger.info('Analyzing archive {} ({}/{})'.format(name, num_archives - i, num_archives))
-            archive_id = info[b'id']
-            if archive_id not in self.chunks:
-                logger.error('Archive metadata block is missing!')
-                self.error_found = True
-                del self.manifest.archives[name]
-                continue
-            mark_as_possibly_superseded(archive_id)
-            cdata = self.repository.get(archive_id)
-            data = self.key.decrypt(archive_id, cdata)
-            archive = StableDict(msgpack.unpackb(data))
-            if archive[b'version'] != 1:
-                raise Exception('Unknown archive metadata version')
-            decode_dict(archive, (b'name', b'hostname', b'username', b'time'))
-            archive[b'cmdline'] = [arg.decode('utf-8', 'surrogateescape') for arg in archive[b'cmdline']]
-            items_buffer = ChunkBuffer(self.key)
-            items_buffer.write_chunk = add_callback
-            for item in robust_iterator(archive):
-                if b'chunks' in item:
-                    verify_file_chunks(item)
-                items_buffer.add(item)
-            items_buffer.flush(flush=True)
-            for previous_item_id in archive[b'items']:
-                mark_as_possibly_superseded(previous_item_id)
-            archive[b'items'] = items_buffer.chunks
-            data = msgpack.packb(archive, unicode_errors='surrogateescape')
-            new_archive_id = self.key.id_hash(data)
-            cdata = self.key.encrypt(data)
-            add_reference(new_archive_id, len(data), len(cdata), cdata)
-            info[b'id'] = new_archive_id
+
+        with cache_if_remote(self.repository) as repository:
+            for i, (name, info) in enumerate(archive_items[:end]):
+                logger.info('Analyzing archive {} ({}/{})'.format(name, num_archives - i, num_archives))
+                archive_id = info[b'id']
+                if archive_id not in self.chunks:
+                    logger.error('Archive metadata block is missing!')
+                    self.error_found = True
+                    del self.manifest.archives[name]
+                    continue
+                mark_as_possibly_superseded(archive_id)
+                cdata = self.repository.get(archive_id)
+                data = self.key.decrypt(archive_id, cdata)
+                archive = StableDict(msgpack.unpackb(data))
+                if archive[b'version'] != 1:
+                    raise Exception('Unknown archive metadata version')
+                decode_dict(archive, (b'name', b'hostname', b'username', b'time'))
+                archive[b'cmdline'] = [arg.decode('utf-8', 'surrogateescape') for arg in archive[b'cmdline']]
+                items_buffer = ChunkBuffer(self.key)
+                items_buffer.write_chunk = add_callback
+                for item in robust_iterator(archive):
+                    if b'chunks' in item:
+                        verify_file_chunks(item)
+                    items_buffer.add(item)
+                items_buffer.flush(flush=True)
+                for previous_item_id in archive[b'items']:
+                    mark_as_possibly_superseded(previous_item_id)
+                archive[b'items'] = items_buffer.chunks
+                data = msgpack.packb(archive, unicode_errors='surrogateescape')
+                new_archive_id = self.key.id_hash(data)
+                cdata = self.key.encrypt(data)
+                add_reference(new_archive_id, len(data), len(cdata), cdata)
+                info[b'id'] = new_archive_id
 
     def orphan_chunks_check(self):
         if self.check_all:

+ 6 - 6
borg/cache.py

@@ -340,12 +340,12 @@ Chunk index:    {0.total_unique_chunks:20d} {0.total_chunks:20d}"""
                 pass
 
         self.begin_txn()
-        repository = cache_if_remote(self.repository)
-        legacy_cleanup()
-        # TEMPORARY HACK: to avoid archive index caching, create a FILE named ~/.cache/borg/REPOID/chunks.archive.d -
-        # this is only recommended if you have a fast, low latency connection to your repo (e.g. if repo is local disk)
-        self.do_cache = os.path.isdir(archive_path)
-        self.chunks = create_master_idx(self.chunks)
+        with cache_if_remote(self.repository) as repository:
+            legacy_cleanup()
+            # TEMPORARY HACK: to avoid archive index caching, create a FILE named ~/.cache/borg/REPOID/chunks.archive.d -
+            # this is only recommended if you have a fast, low latency connection to your repo (e.g. if repo is local disk)
+            self.do_cache = os.path.isdir(archive_path)
+            self.chunks = create_master_idx(self.chunks)
 
     def add_chunk(self, id, data, stats):
         if not self.txn_active:

+ 1 - 0
borg/fuse.py

@@ -238,3 +238,4 @@ class FuseOperations(llfuse.Operations):
             llfuse.main(single=True)
         finally:
             llfuse.close()
+            self.repository.close()

+ 33 - 8
borg/remote.py

@@ -359,22 +359,46 @@ class RemoteRepository:
         self.preload_ids += ids
 
 
-class RepositoryCache:
-    """A caching Repository wrapper
+class RepositoryNoCache:
+    """A not caching Repository wrapper, passes through to repository.
 
-    Caches Repository GET operations using a local temporary Repository.
+    Just to have same API (including the context manager) as RepositoryCache.
     """
     def __init__(self, repository):
         self.repository = repository
-        tmppath = tempfile.mkdtemp(prefix='borg-tmp')
-        self.caching_repo = Repository(tmppath, create=True, exclusive=True)
 
-    def __del__(self):
-        self.caching_repo.destroy()
+    def close(self):
+        pass
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, exc_type, exc_val, exc_tb):
+        self.close()
 
     def get(self, key):
         return next(self.get_many([key]))
 
+    def get_many(self, keys):
+        for data in self.repository.get_many(keys):
+            yield data
+
+
+class RepositoryCache(RepositoryNoCache):
+    """A caching Repository wrapper
+
+    Caches Repository GET operations using a local temporary Repository.
+    """
+    def __init__(self, repository):
+        super().__init__(repository)
+        tmppath = tempfile.mkdtemp(prefix='borg-tmp')
+        self.caching_repo = Repository(tmppath, create=True, exclusive=True)
+
+    def close(self):
+        if self.caching_repo is not None:
+            self.caching_repo.destroy()
+            self.caching_repo = None
+
     def get_many(self, keys):
         unknown_keys = [key for key in keys if key not in self.caching_repo]
         repository_iterator = zip(unknown_keys, self.repository.get_many(unknown_keys))
@@ -395,4 +419,5 @@ class RepositoryCache:
 def cache_if_remote(repository):
     if isinstance(repository, RemoteRepository):
         return RepositoryCache(repository)
-    return repository
+    else:
+        return RepositoryNoCache(repository)