浏览代码

Auto-recover from corrupted index/hints file(s)

And don't swallow all OSErrors when creating archives. We need to work on
that on a more general level...
Marian Beermann 9 年之前
父节点
当前提交
252c1b9802
共有 4 个文件被更改,包括 86 次插入9 次删除
  1. 1 1
      borg/hashindex.pyx
  2. 4 0
      borg/helpers.py
  3. 40 7
      borg/repository.py
  4. 41 1
      borg/testsuite/repository.py

+ 1 - 1
borg/hashindex.pyx

@@ -63,7 +63,7 @@ cdef class IndexBase:
             path = os.fsencode(path)
             path = os.fsencode(path)
             self.index = hashindex_read(path)
             self.index = hashindex_read(path)
             if not self.index:
             if not self.index:
-                raise Exception('hashindex_read failed')
+                raise RuntimeError('hashindex_read failed')
         else:
         else:
             self.index = hashindex_init(capacity, self.key_size, self.value_size)
             self.index = hashindex_init(capacity, self.key_size, self.value_size)
             if not self.index:
             if not self.index:

+ 4 - 0
borg/helpers.py

@@ -65,6 +65,10 @@ class ErrorWithTraceback(Error):
     traceback = True
     traceback = True
 
 
 
 
+class InternalOSError(ErrorWithTraceback):
+    """Error while accessing repository / cache files"""
+
+
 class IntegrityError(ErrorWithTraceback):
 class IntegrityError(ErrorWithTraceback):
     """Data integrity error"""
     """Data integrity error"""
 
 

+ 40 - 7
borg/repository.py

@@ -15,7 +15,8 @@ from zlib import crc32
 
 
 import msgpack
 import msgpack
 from .constants import *  # NOQA
 from .constants import *  # NOQA
-from .helpers import Error, ErrorWithTraceback, IntegrityError, Location, ProgressIndicatorPercent, bin_to_hex
+from .helpers import Error, ErrorWithTraceback, IntegrityError, InternalOSError, Location, ProgressIndicatorPercent, \
+    bin_to_hex
 from .hashindex import NSIndex
 from .hashindex import NSIndex
 from .locking import UpgradableLock, LockError, LockErrorT
 from .locking import UpgradableLock, LockError, LockErrorT
 from .lrucache import LRUCache
 from .lrucache import LRUCache
@@ -178,7 +179,7 @@ class Repository:
         else:
         else:
             return None
             return None
 
 
-    def get_transaction_id(self):
+    def check_transaction(self):
         index_transaction_id = self.get_index_transaction_id()
         index_transaction_id = self.get_index_transaction_id()
         segments_transaction_id = self.io.get_segments_transaction_id()
         segments_transaction_id = self.io.get_segments_transaction_id()
         if index_transaction_id is not None and segments_transaction_id is None:
         if index_transaction_id is not None and segments_transaction_id is None:
@@ -191,6 +192,9 @@ class Repository:
             else:
             else:
                 replay_from = index_transaction_id
                 replay_from = index_transaction_id
             self.replay_segments(replay_from, segments_transaction_id)
             self.replay_segments(replay_from, segments_transaction_id)
+
+    def get_transaction_id(self):
+        self.check_transaction()
         return self.get_index_transaction_id()
         return self.get_index_transaction_id()
 
 
     def break_lock(self):
     def break_lock(self):
@@ -231,10 +235,23 @@ class Repository:
         self.write_index()
         self.write_index()
         self.rollback()
         self.rollback()
 
 
-    def open_index(self, transaction_id):
+    def open_index(self, transaction_id, auto_recover=True):
         if transaction_id is None:
         if transaction_id is None:
             return NSIndex()
             return NSIndex()
-        return NSIndex.read((os.path.join(self.path, 'index.%d') % transaction_id).encode('utf-8'))
+        index_path = (os.path.join(self.path, 'index.%d') % transaction_id).encode('utf-8')
+        try:
+            return NSIndex.read(index_path)
+        except RuntimeError as re:
+            assert str(re) == 'hashindex_read failed'  # everything else means we're in *deep* trouble
+            # corrupted index file, need to replay segments
+            os.unlink(os.path.join(self.path, 'hints.%d' % transaction_id))
+            os.unlink(os.path.join(self.path, 'index.%d' % transaction_id))
+            if not auto_recover:
+                raise
+            self.prepare_txn(self.get_transaction_id())
+            # don't leave an open transaction around
+            self.commit()
+            return self.open_index(self.get_transaction_id())
 
 
     def prepare_txn(self, transaction_id, do_cleanup=True):
     def prepare_txn(self, transaction_id, do_cleanup=True):
         self._active_txn = True
         self._active_txn = True
@@ -247,15 +264,31 @@ class Repository:
             self._active_txn = False
             self._active_txn = False
             raise
             raise
         if not self.index or transaction_id is None:
         if not self.index or transaction_id is None:
-            self.index = self.open_index(transaction_id)
+            try:
+                self.index = self.open_index(transaction_id, False)
+            except RuntimeError:
+                self.check_transaction()
+                self.index = self.open_index(transaction_id, False)
         if transaction_id is None:
         if transaction_id is None:
             self.segments = {}  # XXX bad name: usage_count_of_segment_x = self.segments[x]
             self.segments = {}  # XXX bad name: usage_count_of_segment_x = self.segments[x]
             self.compact = FreeSpace()  # XXX bad name: freeable_space_of_segment_x = self.compact[x]
             self.compact = FreeSpace()  # XXX bad name: freeable_space_of_segment_x = self.compact[x]
         else:
         else:
             if do_cleanup:
             if do_cleanup:
                 self.io.cleanup(transaction_id)
                 self.io.cleanup(transaction_id)
-            with open(os.path.join(self.path, 'hints.%d' % transaction_id), 'rb') as fd:
-                hints = msgpack.unpack(fd)
+            try:
+                with open(os.path.join(self.path, 'hints.%d' % transaction_id), 'rb') as fd:
+                    hints = msgpack.unpack(fd)
+            except (msgpack.UnpackException, msgpack.ExtraData, FileNotFoundError) as e:
+                # corrupted or deleted hints file, need to replay segments
+                if not isinstance(e, FileNotFoundError):
+                    os.unlink(os.path.join(self.path, 'hints.%d' % transaction_id))
+                # index must exist at this point
+                os.unlink(os.path.join(self.path, 'index.%d' % transaction_id))
+                self.check_transaction()
+                self.prepare_txn(transaction_id)
+                return
+            except OSError as os_error:
+                raise InternalOSError from os_error
             if hints[b'version'] == 1:
             if hints[b'version'] == 1:
                 logger.debug('Upgrading from v1 hints.%d', transaction_id)
                 logger.debug('Upgrading from v1 hints.%d', transaction_id)
                 self.segments = hints[b'segments']
                 self.segments = hints[b'segments']

+ 41 - 1
borg/testsuite/repository.py

@@ -7,7 +7,7 @@ import tempfile
 from unittest.mock import patch
 from unittest.mock import patch
 
 
 from ..hashindex import NSIndex
 from ..hashindex import NSIndex
-from ..helpers import Location, IntegrityError
+from ..helpers import Location, IntegrityError, InternalOSError
 from ..locking import UpgradableLock, LockFailed
 from ..locking import UpgradableLock, LockFailed
 from ..remote import RemoteRepository, InvalidRPCMethod, ConnectionClosedWithHint
 from ..remote import RemoteRepository, InvalidRPCMethod, ConnectionClosedWithHint
 from ..repository import Repository, LoggedIO, MAGIC
 from ..repository import Repository, LoggedIO, MAGIC
@@ -270,6 +270,46 @@ class RepositoryAppendOnlyTestCase(RepositoryTestCaseBase):
         assert segments_in_repository() == 6
         assert segments_in_repository() == 6
 
 
 
 
+class RepositoryAuxiliaryCorruptionTestCase(RepositoryTestCaseBase):
+    def setUp(self):
+        super().setUp()
+        self.repository.put(b'00000000000000000000000000000000', b'foo')
+        self.repository.commit()
+        self.repository.close()
+
+    def do_commit(self):
+        with self.repository:
+            self.repository.put(b'00000000000000000000000000000000', b'fox')
+            self.repository.commit()
+
+    def test_corrupted_hints(self):
+        with open(os.path.join(self.repository.path, 'hints.0'), 'ab') as fp:
+            fp.write(b'123456789')
+        self.do_commit()
+
+    def test_deleted_hints(self):
+        os.unlink(os.path.join(self.repository.path, 'hints.0'))
+        self.do_commit()
+
+    def test_unreadable_hints(self):
+        hints = os.path.join(self.repository.path, 'hints.0')
+        os.unlink(hints)
+        os.mkdir(hints)
+        with self.assert_raises(InternalOSError):
+            self.do_commit()
+
+    def test_index(self):
+        with open(os.path.join(self.repository.path, 'index.0'), 'wb') as fp:
+            fp.write(b'123456789')
+        self.do_commit()
+
+    def test_index_outside_transaction(self):
+        with open(os.path.join(self.repository.path, 'index.0'), 'wb') as fp:
+            fp.write(b'123456789')
+        with self.repository:
+            assert len(self.repository) == 1
+
+
 class RepositoryCheckTestCase(RepositoryTestCaseBase):
 class RepositoryCheckTestCase(RepositoryTestCaseBase):
 
 
     def list_indices(self):
     def list_indices(self):