Browse Source

Merge pull request #2255 from ThomasWaldmann/async-errors

async exception processing
TW 8 years ago
parent
commit
4fe9d067da

+ 28 - 9
src/borg/archive.py

@@ -260,7 +260,8 @@ class CacheChunkBuffer(ChunkBuffer):
         self.stats = stats
         self.stats = stats
 
 
     def write_chunk(self, chunk):
     def write_chunk(self, chunk):
-        id_, _, _ = self.cache.add_chunk(self.key.id_hash(chunk.data), chunk, self.stats)
+        id_, _, _ = self.cache.add_chunk(self.key.id_hash(chunk.data), chunk, self.stats, wait=False)
+        self.cache.repository.async_response(wait=False)
         return id_
         return id_
 
 
 
 
@@ -469,6 +470,8 @@ Utilization of max. archive size: {csize_max:.0%}
         data = self.key.pack_and_authenticate_metadata(metadata.as_dict(), context=b'archive')
         data = self.key.pack_and_authenticate_metadata(metadata.as_dict(), context=b'archive')
         self.id = self.key.id_hash(data)
         self.id = self.key.id_hash(data)
         self.cache.add_chunk(self.id, Chunk(data), self.stats)
         self.cache.add_chunk(self.id, Chunk(data), self.stats)
+        while self.repository.async_response(wait=True) is not None:
+            pass
         self.manifest.archives[name] = (self.id, metadata.time)
         self.manifest.archives[name] = (self.id, metadata.time)
         self.manifest.write()
         self.manifest.write()
         self.repository.commit()
         self.repository.commit()
@@ -730,18 +733,27 @@ Utilization of max. archive size: {csize_max:.0%}
         class ChunksIndexError(Error):
         class ChunksIndexError(Error):
             """Chunk ID {} missing from chunks index, corrupted chunks index - aborting transaction."""
             """Chunk ID {} missing from chunks index, corrupted chunks index - aborting transaction."""
 
 
-        def chunk_decref(id, stats):
-            nonlocal error
+        exception_ignored = object()
+
+        def fetch_async_response(wait=True):
             try:
             try:
-                self.cache.chunk_decref(id, stats)
-            except KeyError:
-                cid = bin_to_hex(id)
-                raise ChunksIndexError(cid)
+                return self.repository.async_response(wait=wait)
             except Repository.ObjectNotFound as e:
             except Repository.ObjectNotFound as e:
+                nonlocal error
                 # object not in repo - strange, but we wanted to delete it anyway.
                 # object not in repo - strange, but we wanted to delete it anyway.
                 if forced == 0:
                 if forced == 0:
                     raise
                     raise
                 error = True
                 error = True
+                return exception_ignored  # must not return None here
+
+        def chunk_decref(id, stats):
+            try:
+                self.cache.chunk_decref(id, stats, wait=False)
+            except KeyError:
+                cid = bin_to_hex(id)
+                raise ChunksIndexError(cid)
+            else:
+                fetch_async_response(wait=False)
 
 
         error = False
         error = False
         try:
         try:
@@ -778,6 +790,10 @@ Utilization of max. archive size: {csize_max:.0%}
         # some harmless exception.
         # some harmless exception.
         chunk_decref(self.id, stats)
         chunk_decref(self.id, stats)
         del self.manifest.archives[self.name]
         del self.manifest.archives[self.name]
+        while fetch_async_response(wait=True) is not None:
+            # we did async deletes, process outstanding results (== exceptions),
+            # so there is nothing pending when we return and our caller wants to commit.
+            pass
         if error:
         if error:
             logger.warning('forced deletion succeeded, but the deleted archive was corrupted.')
             logger.warning('forced deletion succeeded, but the deleted archive was corrupted.')
             logger.warning('borg check --repair is required to free all space.')
             logger.warning('borg check --repair is required to free all space.')
@@ -865,7 +881,9 @@ Utilization of max. archive size: {csize_max:.0%}
     def chunk_file(self, item, cache, stats, chunk_iter, chunk_processor=None, **chunk_kw):
     def chunk_file(self, item, cache, stats, chunk_iter, chunk_processor=None, **chunk_kw):
         if not chunk_processor:
         if not chunk_processor:
             def chunk_processor(data):
             def chunk_processor(data):
-                return cache.add_chunk(self.key.id_hash(data), Chunk(data, **chunk_kw), stats)
+                chunk_entry = cache.add_chunk(self.key.id_hash(data), Chunk(data, **chunk_kw), stats, wait=False)
+                self.cache.repository.async_response(wait=False)
+                return chunk_entry
 
 
         item.chunks = []
         item.chunks = []
         from_chunk = 0
         from_chunk = 0
@@ -1654,7 +1672,8 @@ class ArchiveRecreater:
             if Compressor.detect(old_chunk.data).name == compression_spec.name:
             if Compressor.detect(old_chunk.data).name == compression_spec.name:
                 # Stored chunk has the same compression we wanted
                 # Stored chunk has the same compression we wanted
                 overwrite = False
                 overwrite = False
-        chunk_entry = self.cache.add_chunk(chunk_id, chunk, target.stats, overwrite=overwrite)
+        chunk_entry = self.cache.add_chunk(chunk_id, chunk, target.stats, overwrite=overwrite, wait=False)
+        self.cache.repository.async_response(wait=False)
         self.seen_chunks.add(chunk_entry.id)
         self.seen_chunks.add(chunk_entry.id)
         return chunk_entry
         return chunk_entry
 
 

+ 4 - 4
src/borg/cache.py

@@ -524,7 +524,7 @@ Chunk index:    {0.total_unique_chunks:20d} {0.total_chunks:20d}"""
             self.do_cache = os.path.isdir(archive_path)
             self.do_cache = os.path.isdir(archive_path)
             self.chunks = create_master_idx(self.chunks)
             self.chunks = create_master_idx(self.chunks)
 
 
-    def add_chunk(self, id, chunk, stats, overwrite=False):
+    def add_chunk(self, id, chunk, stats, overwrite=False, wait=True):
         if not self.txn_active:
         if not self.txn_active:
             self.begin_txn()
             self.begin_txn()
         size = len(chunk.data)
         size = len(chunk.data)
@@ -533,7 +533,7 @@ Chunk index:    {0.total_unique_chunks:20d} {0.total_chunks:20d}"""
             return self.chunk_incref(id, stats)
             return self.chunk_incref(id, stats)
         data = self.key.encrypt(chunk)
         data = self.key.encrypt(chunk)
         csize = len(data)
         csize = len(data)
-        self.repository.put(id, data, wait=False)
+        self.repository.put(id, data, wait=wait)
         self.chunks.add(id, 1, size, csize)
         self.chunks.add(id, 1, size, csize)
         stats.update(size, csize, not refcount)
         stats.update(size, csize, not refcount)
         return ChunkListEntry(id, size, csize)
         return ChunkListEntry(id, size, csize)
@@ -554,13 +554,13 @@ Chunk index:    {0.total_unique_chunks:20d} {0.total_chunks:20d}"""
         stats.update(size, csize, False)
         stats.update(size, csize, False)
         return ChunkListEntry(id, size, csize)
         return ChunkListEntry(id, size, csize)
 
 
-    def chunk_decref(self, id, stats):
+    def chunk_decref(self, id, stats, wait=True):
         if not self.txn_active:
         if not self.txn_active:
             self.begin_txn()
             self.begin_txn()
         count, size, csize = self.chunks.decref(id)
         count, size, csize = self.chunks.decref(id)
         if count == 0:
         if count == 0:
             del self.chunks[id]
             del self.chunks[id]
-            self.repository.delete(id, wait=False)
+            self.repository.delete(id, wait=wait)
             stats.update(-size, -csize, True)
             stats.update(-size, -csize, True)
         else:
         else:
             stats.update(-size, -csize, False)
             stats.update(-size, -csize, False)

+ 39 - 4
src/borg/remote.py

@@ -513,6 +513,8 @@ class RemoteRepository:
         self.chunkid_to_msgids = {}
         self.chunkid_to_msgids = {}
         self.ignore_responses = set()
         self.ignore_responses = set()
         self.responses = {}
         self.responses = {}
+        self.async_responses = {}
+        self.shutdown_time = None
         self.ratelimit = SleepingBandwidthLimiter(args.remote_ratelimit * 1024 if args and args.remote_ratelimit else 0)
         self.ratelimit = SleepingBandwidthLimiter(args.remote_ratelimit * 1024 if args and args.remote_ratelimit else 0)
         self.unpacker = get_limited_unpacker('client')
         self.unpacker = get_limited_unpacker('client')
         self.server_version = parse_version('1.0.8')  # fallback version if server is too old to send version information
         self.server_version = parse_version('1.0.8')  # fallback version if server is too old to send version information
@@ -604,6 +606,7 @@ This problem will go away as soon as the server has been upgraded to 1.0.7+.
     def __exit__(self, exc_type, exc_val, exc_tb):
     def __exit__(self, exc_type, exc_val, exc_tb):
         try:
         try:
             if exc_type is not None:
             if exc_type is not None:
+                self.shutdown_time = time.monotonic() + 30
                 self.rollback()
                 self.rollback()
         finally:
         finally:
             # in any case, we want to cleanly close the repo, even if the
             # in any case, we want to cleanly close the repo, even if the
@@ -670,8 +673,8 @@ This problem will go away as soon as the server has been upgraded to 1.0.7+.
         for resp in self.call_many(cmd, [args], **kw):
         for resp in self.call_many(cmd, [args], **kw):
             return resp
             return resp
 
 
-    def call_many(self, cmd, calls, wait=True, is_preloaded=False):
-        if not calls:
+    def call_many(self, cmd, calls, wait=True, is_preloaded=False, async_wait=True):
+        if not calls and cmd != 'async_responses':
             return
             return
 
 
         def pop_preload_msgid(chunkid):
         def pop_preload_msgid(chunkid):
@@ -714,6 +717,12 @@ This problem will go away as soon as the server has been upgraded to 1.0.7+.
         calls = list(calls)
         calls = list(calls)
         waiting_for = []
         waiting_for = []
         while wait or calls:
         while wait or calls:
+            if self.shutdown_time and time.monotonic() > self.shutdown_time:
+                # we are shutting this RemoteRepository down already, make sure we do not waste
+                # a lot of time in case a lot of async stuff is coming in or remote is gone or slow.
+                logger.debug('shutdown_time reached, shutting down with %d waiting_for and %d async_responses.',
+                             len(waiting_for), len(self.async_responses))
+                return
             while waiting_for:
             while waiting_for:
                 try:
                 try:
                     unpacked = self.responses.pop(waiting_for[0])
                     unpacked = self.responses.pop(waiting_for[0])
@@ -726,6 +735,22 @@ This problem will go away as soon as the server has been upgraded to 1.0.7+.
                             return
                             return
                 except KeyError:
                 except KeyError:
                     break
                     break
+            if cmd == 'async_responses':
+                while True:
+                    try:
+                        msgid, unpacked = self.async_responses.popitem()
+                    except KeyError:
+                        # there is nothing left what we already have received
+                        if async_wait and self.ignore_responses:
+                            # but do not return if we shall wait and there is something left to wait for:
+                            break
+                        else:
+                            return
+                    else:
+                        if b'exception_class' in unpacked:
+                            handle_error(unpacked)
+                        else:
+                            yield unpacked[RESULT]
             if self.to_send or ((calls or self.preload_ids) and len(waiting_for) < MAX_INFLIGHT):
             if self.to_send or ((calls or self.preload_ids) and len(waiting_for) < MAX_INFLIGHT):
                 w_fds = [self.stdin_fd]
                 w_fds = [self.stdin_fd]
             else:
             else:
@@ -755,8 +780,14 @@ This problem will go away as soon as the server has been upgraded to 1.0.7+.
                             raise UnexpectedRPCDataFormatFromServer(data)
                             raise UnexpectedRPCDataFormatFromServer(data)
                         if msgid in self.ignore_responses:
                         if msgid in self.ignore_responses:
                             self.ignore_responses.remove(msgid)
                             self.ignore_responses.remove(msgid)
+                            # async methods never return values, but may raise exceptions.
                             if b'exception_class' in unpacked:
                             if b'exception_class' in unpacked:
-                                handle_error(unpacked)
+                                self.async_responses[msgid] = unpacked
+                            else:
+                                # we currently do not have async result values except "None",
+                                # so we do not add them into async_responses.
+                                if unpacked[RESULT] is not None:
+                                    self.async_responses[msgid] = unpacked
                         else:
                         else:
                             self.responses[msgid] = unpacked
                             self.responses[msgid] = unpacked
                 elif fd is self.stderr_fd:
                 elif fd is self.stderr_fd:
@@ -805,7 +836,7 @@ This problem will go away as soon as the server has been upgraded to 1.0.7+.
                         # that the fd should be writable
                         # that the fd should be writable
                         if e.errno != errno.EAGAIN:
                         if e.errno != errno.EAGAIN:
                             raise
                             raise
-        self.ignore_responses |= set(waiting_for)
+        self.ignore_responses |= set(waiting_for)  # we lose order here
 
 
     @api(since=parse_version('1.0.0'),
     @api(since=parse_version('1.0.0'),
          append_only={'since': parse_version('1.0.7'), 'previously': False})
          append_only={'since': parse_version('1.0.7'), 'previously': False})
@@ -883,6 +914,10 @@ This problem will go away as soon as the server has been upgraded to 1.0.7+.
             self.p.wait()
             self.p.wait()
             self.p = None
             self.p = None
 
 
+    def async_response(self, wait=True):
+        for resp in self.call_many('async_responses', calls=[], wait=True, async_wait=wait):
+            return resp
+
     def preload(self, ids):
     def preload(self, ids):
         self.preload_ids += ids
         self.preload_ids += ids
 
 

+ 22 - 0
src/borg/repository.py

@@ -785,6 +785,7 @@ class Repository:
         self._active_txn = False
         self._active_txn = False
 
 
     def rollback(self):
     def rollback(self):
+        # note: when used in remote mode, this is time limited, see RemoteRepository.shutdown_time.
         self._rollback(cleanup=False)
         self._rollback(cleanup=False)
 
 
     def __len__(self):
     def __len__(self):
@@ -862,6 +863,11 @@ class Repository:
             yield self.get(id_)
             yield self.get(id_)
 
 
     def put(self, id, data, wait=True):
     def put(self, id, data, wait=True):
+        """put a repo object
+
+        Note: when doing calls with wait=False this gets async and caller must
+              deal with async results / exceptions later.
+        """
         if not self._active_txn:
         if not self._active_txn:
             self.prepare_txn(self.get_transaction_id())
             self.prepare_txn(self.get_transaction_id())
         try:
         try:
@@ -881,6 +887,11 @@ class Repository:
         self.index[id] = segment, offset
         self.index[id] = segment, offset
 
 
     def delete(self, id, wait=True):
     def delete(self, id, wait=True):
+        """delete a repo object
+
+        Note: when doing calls with wait=False this gets async and caller must
+              deal with async results / exceptions later.
+        """
         if not self._active_txn:
         if not self._active_txn:
             self.prepare_txn(self.get_transaction_id())
             self.prepare_txn(self.get_transaction_id())
         try:
         try:
@@ -895,6 +906,17 @@ class Repository:
         self.compact[segment] += size
         self.compact[segment] += size
         self.segments.setdefault(segment, 0)
         self.segments.setdefault(segment, 0)
 
 
+    def async_response(self, wait=True):
+        """Get one async result (only applies to remote repositories).
+
+        async commands (== calls with wait=False, e.g. delete and put) have no results,
+        but may raise exceptions. These async exceptions must get collected later via
+        async_response() calls. Repeat the call until it returns None.
+        The previous calls might either return one (non-None) result or raise an exception.
+        If wait=True is given and there are outstanding responses, it will wait for them
+        to arrive. With wait=False, it will only return already received responses.
+        """
+
     def preload(self, ids):
     def preload(self, ids):
         """Preload objects (only applies to remote repositories)
         """Preload objects (only applies to remote repositories)
         """
         """

+ 6 - 1
src/borg/testsuite/archive.py

@@ -63,10 +63,15 @@ This archive:                   20 B                 10 B                 10 B""
 
 
 class MockCache:
 class MockCache:
 
 
+    class MockRepo:
+        def async_response(self, wait=True):
+            pass
+
     def __init__(self):
     def __init__(self):
         self.objects = {}
         self.objects = {}
+        self.repository = self.MockRepo()
 
 
-    def add_chunk(self, id, chunk, stats=None):
+    def add_chunk(self, id, chunk, stats=None, wait=True):
         self.objects[id] = chunk.data
         self.objects[id] = chunk.data
         return id, len(chunk.data), len(chunk.data)
         return id, len(chunk.data), len(chunk.data)
 
 

+ 2 - 1
src/borg/testsuite/archiver.py

@@ -1255,7 +1255,8 @@ class ArchiverTestCase(ArchiverTestCaseBase):
                     repository.delete(first_chunk_id)
                     repository.delete(first_chunk_id)
                     repository.commit()
                     repository.commit()
                     break
                     break
-        self.cmd('delete', '--force', self.repository_location + '::test')
+        output = self.cmd('delete', '--force', self.repository_location + '::test')
+        self.assert_in('deleted archive was corrupted', output)
         self.cmd('check', '--repair', self.repository_location)
         self.cmd('check', '--repair', self.repository_location)
         output = self.cmd('list', self.repository_location)
         output = self.cmd('list', self.repository_location)
         self.assert_not_in('test', output)
         self.assert_not_in('test', output)