فهرست منبع

Implemented archive metadata checking and repair

Jonas Borgström 11 سال پیش
والد
کامیت
32e773c15d
7فایلهای تغییر یافته به همراه310 افزوده شده و 36 حذف شده
  1. 217 13
      attic/archive.py
  2. 11 10
      attic/archiver.py
  3. 3 3
      attic/cache.py
  4. 6 5
      attic/helpers.py
  5. 2 1
      attic/repository.py
  6. 2 2
      attic/testsuite/archive.py
  7. 69 2
      attic/testsuite/archiver.py

+ 217 - 13
attic/archive.py

@@ -1,6 +1,10 @@
+from binascii import hexlify
 from datetime import datetime, timedelta, timezone
 from getpass import getuser
-from itertools import zip_longest
+from itertools import groupby
+import shutil
+import tempfile
+from attic.key import key_factory
 import msgpack
 import os
 import socket
@@ -10,8 +14,9 @@ import time
 from io import BytesIO
 from attic import xattr
 from attic.chunker import chunkify
+from attic.hashindex import ChunkIndex
 from attic.helpers import Error, uid2user, user2uid, gid2group, group2gid, \
-    Statistics, decode_dict, st_mtime_ns, make_path_safe
+    Manifest, Statistics, decode_dict, st_mtime_ns, make_path_safe
 
 ITEMS_BUFFER = 1024 * 1024
 CHUNK_MIN = 1024
@@ -44,23 +49,26 @@ class DownloadPipeline:
                 yield item
 
     def fetch_many(self, ids, is_preloaded=False):
-        for id_, data in zip_longest(ids, self.repository.get_many(ids, is_preloaded=is_preloaded)):
+        for id_, data in zip(ids, self.repository.get_many(ids, is_preloaded=is_preloaded)):
             yield self.key.decrypt(id_, data)
 
 
 class ChunkBuffer:
     BUFFER_SIZE = 1 * 1024 * 1024
 
-    def __init__(self, cache, key, stats):
+    def __init__(self, key):
         self.buffer = BytesIO()
         self.packer = msgpack.Packer(unicode_errors='surrogateescape')
-        self.cache = cache
         self.chunks = []
         self.key = key
-        self.stats = stats
 
     def add(self, item):
         self.buffer.write(self.packer.pack(item))
+        if self.is_full():
+            self.flush()
+
+    def write_chunk(self, chunk):
+        raise NotImplementedError
 
     def flush(self, flush=False):
         if self.buffer.tell() == 0:
@@ -72,8 +80,7 @@ class ChunkBuffer:
         # Leave the last parital chunk in the buffer unless flush is True
         end = None if flush or len(chunks) == 1 else -1
         for chunk in chunks[:end]:
-            id_, _, _ = self.cache.add_chunk(self.key.id_hash(chunk), chunk, self.stats)
-            self.chunks.append(id_)
+            self.chunks.append(self.write_chunk(chunk))
         if end == -1:
             self.buffer.write(chunks[-1])
 
@@ -81,6 +88,18 @@ class ChunkBuffer:
         return self.buffer.tell() > self.BUFFER_SIZE
 
 
+class CacheChunkBuffer(ChunkBuffer):
+
+    def __init__(self, cache, key, stats):
+        super(CacheChunkBuffer, self).__init__(key)
+        self.cache = cache
+        self.stats = stats
+
+    def write_chunk(self, chunk):
+        id_, _, _ = self.cache.add_chunk(self.key.id_hash(chunk), chunk, self.stats)
+        return id_
+
+
 class Archive:
 
     class DoesNotExist(Error):
@@ -101,7 +120,7 @@ class Archive:
         self.name = name
         self.checkpoint_interval = checkpoint_interval
         self.numeric_owner = numeric_owner
-        self.items_buffer = ChunkBuffer(self.cache, self.key, self.stats)
+        self.items_buffer = CacheChunkBuffer(self.cache, self.key, self.stats)
         self.pipeline = DownloadPipeline(self.repository, self.key)
         if create:
             if name in manifest.archives:
@@ -148,8 +167,6 @@ class Archive:
         if now - self.last_checkpoint > self.checkpoint_interval:
             self.last_checkpoint = now
             self.write_checkpoint()
-        if self.items_buffer.is_full():
-            self.items_buffer.flush()
 
     def write_checkpoint(self):
         self.save(self.checkpoint_name)
@@ -192,7 +209,7 @@ class Archive:
         cache.begin_txn()
         stats = Statistics()
         add(self.id)
-        for id, chunk in zip_longest(self.metadata[b'items'], self.repository.get_many(self.metadata[b'items'])):
+        for id, chunk in zip(self.metadata[b'items'], self.repository.get_many(self.metadata[b'items'])):
             add(id)
             unpacker.feed(self.key.decrypt(id, chunk))
             for item in unpacker:
@@ -306,7 +323,7 @@ class Archive:
 
     def delete(self, cache):
         unpacker = msgpack.Unpacker(use_list=False)
-        for id_, data in zip_longest(self.metadata[b'items'], self.repository.get_many(self.metadata[b'items'])):
+        for id_, data in zip(self.metadata[b'items'], self.repository.get_many(self.metadata[b'items'])):
             unpacker.feed(self.key.decrypt(id_, data))
             self.cache.chunk_decref(id_)
             for item in unpacker:
@@ -388,3 +405,190 @@ class Archive:
     def list_archives(repository, key, manifest, cache=None):
         for name, info in manifest.archives.items():
             yield Archive(repository, key, manifest, name, cache=cache)
+
+
+class ArchiveChecker:
+
+    def __init__(self):
+        self.error_found = False
+        self.progress = True
+        self.possibly_superseded = set()
+        self.tmpdir = tempfile.mkdtemp()
+
+    def __del__(self):
+        shutil.rmtree(self.tmpdir)
+
+    def init_chunks(self):
+        self.chunks = ChunkIndex.create(os.path.join(self.tmpdir, 'chunks').encode('utf-8'))
+        marker = None
+        while True:
+            result = self.repository.list(limit=10000, marker=marker)
+            if not result:
+                break
+            marker = result[-1]
+            for id_ in result:
+                self.chunks[id_] = (0, 0, 0)
+
+    def report_progress(self, msg, error=False):
+        if error:
+            self.error_found = True
+        if error or self.progress:
+            print(msg, file=sys.stderr)
+            sys.stderr.flush()
+
+    def identify_key(self, repository):
+        cdata = repository.get(next(self.chunks.iteritems())[0])
+        return key_factory(repository, cdata)
+
+    def rebuild_manifest(self):
+        self.report_progress('Rebuilding missing manifest, this might take some time...', error=True)
+        manifest = Manifest(self.key, self.repository)
+        for chunk_id, _ in self.chunks.iteritems():
+            cdata = self.repository.get(chunk_id)
+            data = self.key.decrypt(chunk_id, cdata)
+            try:
+                archive = msgpack.unpackb(data)
+            except:
+                continue
+            if isinstance(archive, dict) and b'items' in archive and b'cmdline' in archive:
+                self.report_progress('Found archive ' + archive[b'name'].decode('utf-8'), error=True)
+                manifest.archives[archive[b'name'].decode('utf-8')] = {b'id': chunk_id, b'time': archive[b'time']}
+        self.report_progress('Manifest rebuild complete', error=True)
+        return manifest
+
+    def check(self, repository, progress=True, repair=False):
+        self.report_progress('Starting archive consistency check...')
+        self.repair = repair
+        self.progress = progress
+        self.repository = repository
+        self.init_chunks()
+        self.key = self.identify_key(repository)
+        if not Manifest.MANIFEST_ID in self.chunks:
+            self.manifest = self.rebuild_manifest()
+        else:
+            self.manifest, _ = Manifest.load(repository)
+        self.rebuild_chunks()
+        self.verify_chunks()
+        if not self.error_found:
+            self.report_progress('Archive consistency check complete, no errors found.')
+        return self.repair or not self.error_found
+
+    def verify_chunks(self):
+        unused = set()
+        for id_, (count, size, csize) in self.chunks.iteritems():
+            if count == 0:
+                unused.add(id_)
+        unexpected = unused - self.possibly_superseded
+        if unexpected:
+            self.report_progress('{} excessive objects found'.format(len(unexpected)), error=True)
+        if self.repair:
+            for id_ in unused:
+                self.repository.delete(id_)
+            self.manifest.write()
+            self.repository.commit()
+
+    def rebuild_chunks(self):
+        # Exclude the manifest from chunks
+        del self.chunks[Manifest.MANIFEST_ID]
+        def record_unused(id_):
+            if self.chunks.get(id_, (0,))[0] == 0:
+                self.possibly_superseded.add(id_)
+
+        def add_callback(chunk):
+            id_ = self.key.id_hash(chunk)
+            cdata = self.key.encrypt(chunk)
+            add_reference(id_, len(chunk), len(cdata), cdata)
+            return id_
+
+        def add_reference(id_, size, csize, cdata=None):
+            try:
+                count, _, _ = self.chunks[id_]
+                self.chunks[id_] = count + 1, size, csize
+            except KeyError:
+                assert cdata is not None
+                self.chunks[id_] = 1, size, csize
+                if self.repair:
+                    self.repository.put(id_, cdata)
+
+        def verify_file_chunks(item):
+            offset = 0
+            chunk_list = []
+            for chunk_id, size, csize in item[b'chunks']:
+                if not chunk_id in self.chunks:
+                    # If a file chunk is missing, create an all empty replacement chunk
+                    self.report_progress('{}: Missing file chunk detected (Byte {}-{})'.format(item[b'path'].decode('utf-8', 'surrogateescape'), offset, offset + size), error=True)
+                    data = bytes(size)
+                    chunk_id = self.key.id_hash(data)
+                    cdata = self.key.encrypt(data)
+                    csize = len(cdata)
+                    add_reference(chunk_id, size, csize, cdata)
+                else:
+                    add_reference(chunk_id, size, csize)
+                chunk_list.append((chunk_id, size, csize))
+                offset += size
+            item[b'chunks'] = chunk_list
+
+        def msgpack_resync(data):
+            data = memoryview(data)
+            while data:
+                unpacker = msgpack.Unpacker()
+                unpacker.feed(data)
+                item = next(unpacker)
+                if isinstance(item, dict) and b'path' in item:
+                    return data
+                data = data[1:]
+
+        def robust_iterator(archive):
+            prev_state = None
+            state = 0
+            def missing_chunk_detector(chunk_id):
+                nonlocal state
+                if state % 2 != int(not chunk_id in self.chunks):
+                    state += 1
+                return state
+
+            for state, items in groupby(archive[b'items'], missing_chunk_detector):
+                if state != prev_state:
+                    unpacker = msgpack.Unpacker()
+                    prev_state = state
+                if state % 2:
+                    self.report_progress('Archive metadata damage detected', error=True)
+                    return
+                items = list(items)
+                for i, (chunk_id, cdata) in enumerate(zip(items, self.repository.get_many(items))):
+                    data = self.key.decrypt(chunk_id, cdata)
+                    if state and i == 0:
+                        data = msgpack_resync(data)
+                    unpacker.feed(data)
+                    for item in unpacker:
+                        yield item
+
+        for name, info in list(self.manifest.archives.items()):
+            self.report_progress('Analyzing archive: ' + name)
+            archive_id = info[b'id']
+            if not archive_id in self.chunks:
+                self.report_progress('Archive metadata block is missing', error=True)
+                del self.manifest.archives[name]
+                continue
+            items_buffer = ChunkBuffer(self.key)
+            items_buffer.write_chunk = add_callback
+            cdata = self.repository.get(archive_id)
+            data = self.key.decrypt(archive_id, cdata)
+            archive = msgpack.unpackb(data)
+            if archive[b'version'] != 1:
+                raise Exception('Unknown archive metadata version')
+            decode_dict(archive, (b'name', b'hostname', b'username', b'time'))  # fixme: argv
+            for item in robust_iterator(archive):
+                if b'chunks' in item:
+                    verify_file_chunks(item)
+                items_buffer.add(item)
+            items_buffer.flush(flush=True)
+            for previous_item_id in archive[b'items']:
+                record_unused(previous_item_id)
+            archive[b'items'] = items_buffer.chunks
+            data = msgpack.packb(archive, unicode_errors='surrogateescape')
+            new_archive_id = self.key.id_hash(data)
+            cdata = self.key.encrypt(data)
+            add_reference(new_archive_id, len(data), len(cdata), cdata)
+            record_unused(archive_id)
+            info[b'id'] = new_archive_id

+ 11 - 10
attic/archiver.py

@@ -7,7 +7,7 @@ import stat
 import sys
 
 from attic import __version__
-from attic.archive import Archive
+from attic.archive import Archive, ArchiveChecker
 from attic.repository import Repository
 from attic.cache import Cache
 from attic.key import key_creator
@@ -53,8 +53,7 @@ class Archiver:
         print('Initializing repository at "%s"' % args.repository.orig)
         repository = self.open_repository(args.repository, create=True)
         key = key_creator(repository, args)
-        manifest = Manifest()
-        manifest.repository = repository
+        manifest = Manifest(key, repository)
         manifest.key = key
         manifest.write()
         repository.commit()
@@ -65,10 +64,9 @@ class Archiver:
         """
         repository = self.open_repository(args.repository)
         if args.repair:
-            while True:
-                self.print_error("""Warning: check --repair is an experimental feature that might result
-in data loss. Checking and repairing archive metadata consistency is not yet
-supported so some types of corruptions will be undetected and not repaired.
+            while not os.environ.get('ATTIC_CHECK_I_KWOW_WHAT_I_AM_DOING'):
+                self.print_error("""Warning: 'check --repair' is an experimental feature that might result
+in data loss.
 
 Type "Yes I am sure" if you understand this and want to continue.\n""")
                 if input('Do you want to continue? ') == 'Yes I am sure':
@@ -76,8 +74,11 @@ Type "Yes I am sure" if you understand this and want to continue.\n""")
         if args.progress is None:
             args.progress = sys.stdout.isatty() or args.verbose
         if not repository.check(progress=args.progress, repair=args.repair):
-            self.exit_code = 1
-        return self.exit_code
+            return 1
+
+        if not ArchiveChecker().check(repository, progress=args.progress, repair=args.repair):
+            return 1
+        return 0
 
     def do_change_passphrase(self, args):
         """Change repository key file passphrase
@@ -85,7 +86,7 @@ Type "Yes I am sure" if you understand this and want to continue.\n""")
         repository = self.open_repository(args.repository)
         manifest, key = Manifest.load(repository)
         key.change_passphrase()
-        return self.exit_code
+        return 0
 
     def do_create(self, args):
         """Create new archive

+ 3 - 3
attic/cache.py

@@ -16,17 +16,17 @@ class Cache(object):
     class RepositoryReplay(Error):
         """Cache is newer than repository, refusing to continue"""
 
-    def __init__(self, repository, key, manifest):
+    def __init__(self, repository, key, manifest, path=None, sync=True):
         self.timestamp = None
         self.txn_active = False
         self.repository = repository
         self.key = key
         self.manifest = manifest
-        self.path = os.path.join(get_cache_dir(), hexlify(repository.id).decode('ascii'))
+        self.path = path or os.path.join(get_cache_dir(), hexlify(repository.id).decode('ascii'))
         if not os.path.exists(self.path):
             self.create()
         self.open()
-        if self.manifest.id != self.manifest_id:
+        if sync and self.manifest.id != self.manifest_id:
             # If repository is older than the cache something fishy is going on
             if self.timestamp and self.timestamp > manifest.timestamp:
                 raise self.RepositoryReplay()

+ 6 - 5
attic/helpers.py

@@ -56,17 +56,18 @@ class Manifest:
 
     MANIFEST_ID = b'\0' * 32
 
-    def __init__(self):
+    def __init__(self, key, repository):
         self.archives = {}
         self.config = {}
+        self.key = key
+        self.repository = repository
 
     @classmethod
     def load(cls, repository):
         from .key import key_factory
-        manifest = cls()
-        manifest.repository = repository
-        cdata = repository.get(manifest.MANIFEST_ID)
-        manifest.key = key = key_factory(repository, cdata)
+        cdata = repository.get(cls.MANIFEST_ID)
+        key = key_factory(repository, cdata)
+        manifest = cls(key, repository)
         data = key.decrypt(None, cdata)
         manifest.id = key.id_hash(data)
         m = msgpack.unpackb(data)

+ 2 - 1
attic/repository.py

@@ -202,6 +202,7 @@ class Repository(object):
                 sys.stderr.flush()
 
         assert not self._active_txn
+        report_progress('Starting repository check...')
         index_transaction_id = self.get_index_transaction_id()
         segments_transaction_id = self.io.get_segments_transaction_id(index_transaction_id)
         if index_transaction_id is None and segments_transaction_id is None:
@@ -271,7 +272,7 @@ class Repository(object):
         if current_index and len(current_index) != len(self.index):
             report_progress('Index object count mismatch. {} != {}'.format(len(current_index), len(self.index)), error=True)
         if not error_found:
-            report_progress('Check complete, no errors found.')
+            report_progress('Repository check complete, no errors found.')
         if repair:
             self.write_index()
         else:

+ 2 - 2
attic/testsuite/archive.py

@@ -1,6 +1,6 @@
 import msgpack
 from attic.testsuite import AtticTestCase
-from attic.archive import ChunkBuffer
+from attic.archive import CacheChunkBuffer
 from attic.key import PlaintextKey
 
 
@@ -20,7 +20,7 @@ class ChunkBufferTestCase(AtticTestCase):
         data = [{b'foo': 1}, {b'bar': 2}]
         cache = MockCache()
         key = PlaintextKey()
-        chunks = ChunkBuffer(cache, key, None)
+        chunks = CacheChunkBuffer(cache, key, None)
         for d in data:
             chunks.add(d)
             chunks.flush()

+ 69 - 2
attic/testsuite/archiver.py

@@ -9,7 +9,9 @@ import time
 import unittest
 from hashlib import sha256
 from attic import xattr
+from attic.archive import Archive
 from attic.archiver import Archiver
+from attic.helpers import Manifest
 from attic.repository import Repository
 from attic.testsuite import AtticTestCase
 from attic.crypto import bytes_to_long, num_aes_blocks
@@ -20,7 +22,7 @@ try:
 except ImportError:
     has_llfuse = False
 
-src_dir = os.path.join(os.getcwd(), os.path.dirname(__file__), '..', '..')
+src_dir = os.path.join(os.getcwd(), os.path.dirname(__file__), '..')
 
 
 class changedir:
@@ -35,11 +37,12 @@ class changedir:
         os.chdir(self.old)
 
 
-class ArchiverTestCase(AtticTestCase):
+class ArchiverTestCaseBase(AtticTestCase):
 
     prefix = ''
 
     def setUp(self):
+        os.environ['ATTIC_CHECK_I_KWOW_WHAT_I_AM_DOING'] = '1'
         self.archiver = Archiver()
         self.tmpdir = tempfile.mkdtemp()
         self.repository_path = os.path.join(self.tmpdir, 'repository')
@@ -96,6 +99,9 @@ class ArchiverTestCase(AtticTestCase):
     def create_src_archive(self, name):
         self.attic('create', self.repository_location + '::' + name, src_dir)
 
+
+class ArchiverTestCase(ArchiverTestCaseBase):
+
     def create_regual_file(self, name, size=0):
         filename = os.path.join(self.input_path, name)
         if not os.path.exists(os.path.dirname(filename)):
@@ -299,6 +305,67 @@ class ArchiverTestCase(AtticTestCase):
         self.verify_aes_counter_uniqueness('passphrase')
 
 
+class ArchiverCheckTestCase(ArchiverTestCaseBase):
+
+    def setUp(self):
+        super(ArchiverCheckTestCase, self).setUp()
+        self.attic('init', self.repository_location)
+        self.create_src_archive('archive1')
+        self.create_src_archive('archive2')
+
+    def open_archive(self, name):
+        repository = Repository(self.repository_path)
+        manifest, key = Manifest.load(repository)
+        archive = Archive(repository, key, manifest, name)
+        return archive, repository
+
+    def test_missing_file_chunk(self):
+        archive, repository = self.open_archive('archive1')
+        for item in archive.iter_items():
+            if item[b'path'].endswith('testsuite/archiver.py'):
+                repository.delete(item[b'chunks'][-1][0])
+                break
+        repository.commit()
+        self.attic('check', self.repository_location, exit_code=1)
+        self.attic('check', '--repair', self.repository_location, exit_code=0)
+        self.attic('check', self.repository_location, exit_code=0)
+
+    def test_missing_archive_item_chunk(self):
+        archive, repository = self.open_archive('archive1')
+        repository.delete(archive.metadata[b'items'][-1])
+        repository.commit()
+        self.attic('check', self.repository_location, exit_code=1)
+        self.attic('check', '--repair', self.repository_location, exit_code=0)
+        self.attic('check', self.repository_location, exit_code=0)
+
+    def test_missing_archive_metadata(self):
+        archive, repository = self.open_archive('archive1')
+        repository.delete(archive.id)
+        repository.commit()
+        self.attic('check', self.repository_location, exit_code=1)
+        self.attic('check', '--repair', self.repository_location, exit_code=0)
+        self.attic('check', self.repository_location, exit_code=0)
+
+    def test_missing_manifest(self):
+        archive, repository = self.open_archive('archive1')
+        repository.delete(Manifest.MANIFEST_ID)
+        repository.commit()
+        self.attic('check', self.repository_location, exit_code=1)
+        self.attic('check', '--repair', '--progress', self.repository_location, exit_code=0)
+        self.attic('check', '--progress', self.repository_location, exit_code=0)
+
+    def test_extra_chunks(self):
+        self.attic('check', self.repository_location, exit_code=0)
+        repository = Repository(self.repository_location)
+        repository.put(b'01234567890123456789012345678901', b'xxxx')
+        repository.commit()
+        repository.close()
+        self.attic('check', self.repository_location, exit_code=1)
+        self.attic('check', self.repository_location, exit_code=1)
+        self.attic('check', '--repair', self.repository_location, exit_code=0)
+        self.attic('check', self.repository_location, exit_code=0)
+        self.attic('verify', self.repository_location + '::archive1', exit_code=0)
+
 
 class RemoteArchiverTestCase(ArchiverTestCase):
     prefix = '__testsuite__:'