소스 검색

Merge pull request #2584 from enkore/issue/1101.integration.repository

repository: checksum index and hints
enkore 8 년 전
부모
커밋
c77b758e74
2개의 변경된 파일180개의 추가작업 그리고 25개의 파일을 삭제
  1. 82 25
      src/borg/repository.py
  2. 98 0
      src/borg/testsuite/repository.py

+ 82 - 25
src/borg/repository.py

@@ -24,6 +24,7 @@ from .logger import create_logger
 from .lrucache import LRUCache
 from .platform import SaveFile, SyncFile, sync_dir, safe_fadvise
 from .algorithms.checksums import crc32
+from .crypto.file_integrity import IntegrityCheckedFile, FileIntegrityError
 
 logger = create_logger(__name__)
 
@@ -372,13 +373,28 @@ class Repository:
         self.write_index()
         self.rollback()
 
+    def _read_integrity(self, transaction_id, key):
+        integrity_file = 'integrity.%d' % transaction_id
+        integrity_path = os.path.join(self.path, integrity_file)
+        try:
+            with open(integrity_path, 'rb') as fd:
+                integrity = msgpack.unpack(fd)
+        except FileNotFoundError:
+            return
+        if integrity.get(b'version') != 2:
+            logger.warning('Unknown integrity data version %r in %s', integrity.get(b'version'), integrity_file)
+            return
+        return integrity[key].decode()
+
     def open_index(self, transaction_id, auto_recover=True):
         if transaction_id is None:
             return NSIndex()
-        index_path = os.path.join(self.path, 'index.%d' % transaction_id).encode('utf-8')
+        index_path = os.path.join(self.path, 'index.%d' % transaction_id)
+        integrity_data = self._read_integrity(transaction_id, b'index')
         try:
-            return NSIndex.read(index_path)
-        except (ValueError, OSError) as exc:
+            with IntegrityCheckedFile(index_path, write=False, integrity_data=integrity_data) as fd:
+                return NSIndex.read(fd)
+        except (ValueError, OSError, FileIntegrityError) as exc:
             logger.warning('Repository index missing or corrupted, trying to recover from: %s', exc)
             os.unlink(index_path)
             if not auto_recover:
@@ -409,11 +425,11 @@ class Repository:
                 raise
         if not self.index or transaction_id is None:
             try:
-                self.index = self.open_index(transaction_id, False)
-            except (ValueError, OSError) as exc:
+                self.index = self.open_index(transaction_id, auto_recover=False)
+            except (ValueError, OSError, FileIntegrityError) as exc:
                 logger.warning('Checking repository transaction due to previous error: %s', exc)
                 self.check_transaction()
-                self.index = self.open_index(transaction_id, False)
+                self.index = self.open_index(transaction_id, auto_recover=False)
         if transaction_id is None:
             self.segments = {}  # XXX bad name: usage_count_of_segment_x = self.segments[x]
             self.compact = FreeSpace()  # XXX bad name: freeable_space_of_segment_x = self.compact[x]
@@ -424,11 +440,12 @@ class Repository:
                 self.io.cleanup(transaction_id)
             hints_path = os.path.join(self.path, 'hints.%d' % transaction_id)
             index_path = os.path.join(self.path, 'index.%d' % transaction_id)
+            integrity_data = self._read_integrity(transaction_id, b'hints')
             try:
-                with open(hints_path, 'rb') as fd:
+                with IntegrityCheckedFile(hints_path, write=False, integrity_data=integrity_data) as fd:
                     hints = msgpack.unpack(fd)
-            except (msgpack.UnpackException, msgpack.ExtraData, FileNotFoundError) as e:
-                logger.warning('Repository hints file missing or corrupted, trying to recover')
+            except (msgpack.UnpackException, msgpack.ExtraData, FileNotFoundError, FileIntegrityError) as e:
+                logger.warning('Repository hints file missing or corrupted, trying to recover: %s', e)
                 if not isinstance(e, FileNotFoundError):
                     os.unlink(hints_path)
                 # index must exist at this point
@@ -459,28 +476,68 @@ class Repository:
                         shadowed_segments.remove(segment)
 
     def write_index(self):
-        hints = {b'version': 2,
-                 b'segments': self.segments,
-                 b'compact': self.compact,
-                 b'storage_quota_use': self.storage_quota_use, }
-        transaction_id = self.io.get_segments_transaction_id()
-        assert transaction_id is not None
-        hints_file = os.path.join(self.path, 'hints.%d' % transaction_id)
-        with open(hints_file + '.tmp', 'wb') as fd:
-            msgpack.pack(hints, fd)
+        def flush_and_sync(fd):
             fd.flush()
             os.fsync(fd.fileno())
-        os.rename(hints_file + '.tmp', hints_file)
-        self.index.write(os.path.join(self.path, 'index.tmp'))
-        os.rename(os.path.join(self.path, 'index.tmp'),
-                  os.path.join(self.path, 'index.%d' % transaction_id))
+
+        def rename_tmp(file):
+            os.rename(file + '.tmp', file)
+
+        hints = {
+            b'version': 2,
+            b'segments': self.segments,
+            b'compact': self.compact,
+            b'storage_quota_use': self.storage_quota_use,
+        }
+        integrity = {
+            # Integrity version started at 2, the current hints version.
+            # Thus, integrity version == hints version, for now.
+            b'version': 2,
+        }
+        transaction_id = self.io.get_segments_transaction_id()
+        assert transaction_id is not None
+
+        # Log transaction in append-only mode
         if self.append_only:
             with open(os.path.join(self.path, 'transactions'), 'a') as log:
                 print('transaction %d, UTC time %s' % (transaction_id, datetime.utcnow().isoformat()), file=log)
+
+        # Write hints file
+        hints_name = 'hints.%d' % transaction_id
+        hints_file = os.path.join(self.path, hints_name)
+        with IntegrityCheckedFile(hints_file + '.tmp', filename=hints_name, write=True) as fd:
+            msgpack.pack(hints, fd)
+            flush_and_sync(fd)
+        integrity[b'hints'] = fd.integrity_data
+
+        # Write repository index
+        index_name = 'index.%d' % transaction_id
+        index_file = os.path.join(self.path, index_name)
+        with IntegrityCheckedFile(index_file + '.tmp', filename=index_name, write=True) as fd:
+            # XXX: Consider using SyncFile for index write-outs.
+            self.index.write(fd)
+            flush_and_sync(fd)
+        integrity[b'index'] = fd.integrity_data
+
+        # Write integrity file, containing checksums of the hints and index files
+        integrity_name = 'integrity.%d' % transaction_id
+        integrity_file = os.path.join(self.path, integrity_name)
+        with open(integrity_file + '.tmp', 'wb') as fd:
+            msgpack.pack(integrity, fd)
+            flush_and_sync(fd)
+
+        # Rename the integrity file first
+        rename_tmp(integrity_file)
+        sync_dir(self.path)
+        # Rename the others after the integrity file is hypothetically on disk
+        rename_tmp(hints_file)
+        rename_tmp(index_file)
+        sync_dir(self.path)
+
         # Remove old auxiliary files
         current = '.%d' % transaction_id
         for name in os.listdir(self.path):
-            if not name.startswith(('index.', 'hints.')):
+            if not name.startswith(('index.', 'hints.', 'integrity.')):
                 continue
             if name.endswith(current):
                 continue
@@ -563,7 +620,7 @@ class Repository:
             # get rid of the old, sparse, unused segments. free space.
             for segment in unused:
                 logger.debug('complete_xfer: deleting unused segment %d', segment)
-                assert self.segments.pop(segment) == 0
+                assert self.segments.pop(segment) == 0, 'Corrupted segment reference count - corrupted index or hints'
                 self.io.delete_segment(segment)
                 del self.compact[segment]
             unused = []
@@ -657,7 +714,7 @@ class Repository:
                             new_segment, size = self.io.write_delete(key)
                         self.compact[new_segment] += size
                         segments.setdefault(new_segment, 0)
-            assert segments[segment] == 0
+            assert segments[segment] == 0, 'Corrupted segment reference count - corrupted index or hints'
             unused.append(segment)
             pi.show()
         pi.finish()

+ 98 - 0
src/borg/testsuite/repository.py

@@ -6,6 +6,8 @@ import sys
 import tempfile
 from unittest.mock import patch
 
+import msgpack
+
 import pytest
 
 from ..hashindex import NSIndex
@@ -537,6 +539,42 @@ class RepositoryAuxiliaryCorruptionTestCase(RepositoryTestCaseBase):
         with self.repository:
             assert len(self.repository) == 1
 
+    def _corrupt_index(self):
+        # HashIndex is able to detect incorrect headers and file lengths,
+        # but on its own it can't tell if the data is correct.
+        index_path = os.path.join(self.repository.path, 'index.1')
+        with open(index_path, 'r+b') as fd:
+            index_data = fd.read()
+            # Flip one bit in a key stored in the index
+            corrupted_key = (int.from_bytes(H(0), 'little') ^ 1).to_bytes(32, 'little')
+            corrupted_index_data = index_data.replace(H(0), corrupted_key)
+            assert corrupted_index_data != index_data
+            assert len(corrupted_index_data) == len(index_data)
+            fd.seek(0)
+            fd.write(corrupted_index_data)
+
+    def test_index_corrupted(self):
+        # HashIndex is able to detect incorrect headers and file lengths,
+        # but on its own it can't tell if the data itself is correct.
+        self._corrupt_index()
+        with self.repository:
+            # Data corruption is detected due to mismatching checksums
+            # and fixed by rebuilding the index.
+            assert len(self.repository) == 1
+            assert self.repository.get(H(0)) == b'foo'
+
+    def test_index_corrupted_without_integrity(self):
+        self._corrupt_index()
+        integrity_path = os.path.join(self.repository.path, 'integrity.1')
+        os.unlink(integrity_path)
+        with self.repository:
+            # Since the corrupted key is not noticed, the repository still thinks
+            # it contains one key...
+            assert len(self.repository) == 1
+            with pytest.raises(Repository.ObjectNotFound):
+                # ... but the real, uncorrupted key is not found in the corrupted index.
+                self.repository.get(H(0))
+
     def test_unreadable_index(self):
         index = os.path.join(self.repository.path, 'index.1')
         os.unlink(index)
@@ -544,6 +582,66 @@ class RepositoryAuxiliaryCorruptionTestCase(RepositoryTestCaseBase):
         with self.assert_raises(OSError):
             self.do_commit()
 
+    def test_unknown_integrity_version(self):
+        # For now an unknown integrity data version is ignored and not an error.
+        integrity_path = os.path.join(self.repository.path, 'integrity.1')
+        with open(integrity_path, 'r+b') as fd:
+            msgpack.pack({
+                # Borg only understands version 2
+                b'version': 4.7,
+            }, fd)
+            fd.truncate()
+        with self.repository:
+            # No issues accessing the repository
+            assert len(self.repository) == 1
+            assert self.repository.get(H(0)) == b'foo'
+
+    def _subtly_corrupted_hints_setup(self):
+        with self.repository:
+            self.repository.append_only = True
+            assert len(self.repository) == 1
+            assert self.repository.get(H(0)) == b'foo'
+            self.repository.put(H(1), b'bar')
+            self.repository.put(H(2), b'baz')
+            self.repository.commit()
+            self.repository.put(H(2), b'bazz')
+            self.repository.commit()
+
+        hints_path = os.path.join(self.repository.path, 'hints.5')
+        with open(hints_path, 'r+b') as fd:
+            hints = msgpack.unpack(fd)
+            fd.seek(0)
+            # Corrupt segment refcount
+            assert hints[b'segments'][2] == 1
+            hints[b'segments'][2] = 0
+            msgpack.pack(hints, fd)
+            fd.truncate()
+
+    def test_subtly_corrupted_hints(self):
+        self._subtly_corrupted_hints_setup()
+        with self.repository:
+            self.repository.append_only = False
+            self.repository.put(H(3), b'1234')
+            # Do a compaction run. Succeeds, since the failed checksum prompted a rebuild of the index+hints.
+            self.repository.commit()
+
+            assert len(self.repository) == 4
+            assert self.repository.get(H(0)) == b'foo'
+            assert self.repository.get(H(1)) == b'bar'
+            assert self.repository.get(H(2)) == b'bazz'
+
+    def test_subtly_corrupted_hints_without_integrity(self):
+        self._subtly_corrupted_hints_setup()
+        integrity_path = os.path.join(self.repository.path, 'integrity.5')
+        os.unlink(integrity_path)
+        with self.repository:
+            self.repository.append_only = False
+            self.repository.put(H(3), b'1234')
+            # Do a compaction run. Fails, since the corrupted refcount was not detected and leads to an assertion failure.
+            with pytest.raises(AssertionError) as exc_info:
+                self.repository.commit()
+            assert 'Corrupted segment reference count' in str(exc_info.value)
+
 
 class RepositoryCheckTestCase(RepositoryTestCaseBase):