Sfoglia il codice sorgente

check: archive metadata recovery improvements

Jonas Borgström 11 anni fa
parent
commit
9e8a944a2a
3 ha cambiato i file con 120 aggiunte e 34 eliminazioni
  1. 59 27
      attic/archive.py
  2. 53 1
      attic/testsuite/archive.py
  3. 8 6
      attic/testsuite/archiver.py

+ 59 - 27
attic/archive.py

@@ -402,6 +402,53 @@ class Archive:
             yield Archive(repository, key, manifest, name, cache=cache)
 
 
+class RobustUnpacker():
+    """A restartable/robust version of the streaming msgpack unpacker
+    """
+    def __init__(self, validator):
+        super(RobustUnpacker, self).__init__()
+        self.validator = validator
+        self._buffered_data = []
+        self._resync = False
+        self._skip = 0
+        self._unpacker = msgpack.Unpacker(object_hook=StableDict)
+
+    def resync(self):
+        self._buffered_data = []
+        self._resync = True
+        self._skip = 0
+
+    def feed(self, data):
+        if self._resync:
+            self._buffered_data.append(data)
+        else:
+            self._unpacker.feed(data)
+
+    def __iter__(self):
+        return self
+
+    def __next__(self):
+        if self._resync:
+            while self._resync:
+                data = b''.join(self._buffered_data)[self._skip:]
+                if not data:
+                    raise StopIteration
+                temp_unpacker = msgpack.Unpacker()
+                temp_unpacker.feed(data)
+                for item in temp_unpacker:
+                    if self.validator(item):
+                        self._resync = False
+                        self._unpacker = msgpack.Unpacker(object_hook=StableDict)
+                        self.feed(b''.join(self._buffered_data)[self._skip:])
+                        return self._unpacker.__next__()
+                    else:
+                        self._skip += 1
+                else:
+                    raise StopIteration
+        else:
+            return self._unpacker.__next__()
+
+
 class ArchiveChecker:
 
     def __init__(self):
@@ -527,38 +574,23 @@ class ArchiveChecker:
                 offset += size
             item[b'chunks'] = chunk_list
 
-        def msgpack_resync(data):
-            data = memoryview(data)
-            while data:
-                unpacker = msgpack.Unpacker()
-                unpacker.feed(data)
-                item = next(unpacker)
-                if isinstance(item, dict) and b'path' in item:
-                    return data
-                data = data[1:]
-
         def robust_iterator(archive):
-            prev_state = None
-            state = 0
+            unpacker = RobustUnpacker(lambda item: isinstance(item, dict) and b'path' in item)
+            _state = 0
             def missing_chunk_detector(chunk_id):
-                nonlocal state
-                if state % 2 != int(not chunk_id in self.chunks):
-                    state += 1
-                return state
-
+                nonlocal _state
+                if _state % 2 != int(not chunk_id in self.chunks):
+                    _state += 1
+                return _state
             for state, items in groupby(archive[b'items'], missing_chunk_detector):
-                if state != prev_state:
-                    unpacker = msgpack.Unpacker(object_hook=StableDict)
-                    prev_state = state
+                items = list(items)
                 if state % 2:
                     self.report_progress('Archive metadata damage detected', error=True)
-                    return
-                items = list(items)
-                for i, (chunk_id, cdata) in enumerate(zip(items, self.repository.get_many(items))):
-                    data = self.key.decrypt(chunk_id, cdata)
-                    if state and i == 0:
-                        data = msgpack_resync(data)
-                    unpacker.feed(data)
+                    continue
+                if state > 0:
+                    unpacker.resync()
+                for chunk_id, cdata in zip(items, self.repository.get_many(items)):
+                    unpacker.feed(self.key.decrypt(chunk_id, cdata))
                     for item in unpacker:
                         yield item
 

+ 53 - 1
attic/testsuite/archive.py

@@ -1,6 +1,6 @@
 import msgpack
 from attic.testsuite import AtticTestCase
-from attic.archive import CacheChunkBuffer
+from attic.archive import CacheChunkBuffer, RobustUnpacker
 from attic.key import PlaintextKey
 
 
@@ -30,3 +30,55 @@ class ChunkBufferTestCase(AtticTestCase):
         for id in chunks.chunks:
             unpacker.feed(cache.objects[id])
         self.assert_equal(data, list(unpacker))
+
+
+class RobustUnpackerTestCase(AtticTestCase):
+
+    def make_chunks(self, items):
+        return b''.join(msgpack.packb(item) for item in items)
+
+    def _validator(self, value):
+        return value in (b'foo', b'bar', b'boo', b'baz')
+
+    def process(self, input):
+        unpacker = RobustUnpacker(validator=self._validator)
+        result = []
+        for should_sync, chunks in input:
+            if should_sync:
+                unpacker.resync()
+            for data in chunks:
+                unpacker.feed(data)
+                for item in unpacker:
+                    result.append(item)
+        return result
+
+    def test_extra_garbage_no_sync(self):
+        chunks = [(False, [self.make_chunks([b'foo', b'bar'])]),
+                  (False, [b'garbage'] + [self.make_chunks([b'boo', b'baz'])])]
+        result = self.process(chunks)
+        self.assert_equal(result, [b'foo', b'bar', 103, 97, 114, 98, 97, 103, 101, b'boo', b'baz'])
+
+    def split(self, left, length):
+        parts = []
+        while left:
+            parts.append(left[:length])
+            left = left[length:]
+        return parts
+
+    def test_correct_stream(self):
+        chunks = self.split(self.make_chunks([b'foo', b'bar', b'boo', b'baz']), 2)
+        input = [(False, chunks)]
+        result = self.process(input)
+        self.assert_equal(result, [b'foo', b'bar', b'boo', b'baz'])
+
+    def test_missing_chunk(self):
+        chunks = self.split(self.make_chunks([b'foo', b'bar', b'boo', b'baz']), 2)
+        input = [(False, chunks[:3]), (True, chunks[4:])]
+        result = self.process(input)
+        self.assert_equal(result, [b'foo', b'boo', b'baz'])
+
+    def test_corrupt_chunk(self):
+        chunks = self.split(self.make_chunks([b'foo', b'bar', b'boo', b'baz']), 2)
+        input = [(False, chunks[:3]), (True, [b'gar', b'bage'] + chunks[3:])]
+        result = self.process(input)
+        self.assert_equal(result, [b'foo', b'boo', b'baz'])

+ 8 - 6
attic/testsuite/archiver.py

@@ -9,12 +9,13 @@ import time
 import unittest
 from hashlib import sha256
 from attic import xattr
-from attic.archive import Archive
+from attic.archive import Archive, ChunkBuffer
 from attic.archiver import Archiver
+from attic.crypto import bytes_to_long, num_aes_blocks
 from attic.helpers import Manifest
 from attic.repository import Repository
 from attic.testsuite import AtticTestCase
-from attic.crypto import bytes_to_long, num_aes_blocks
+from attic.testsuite.mock import patch
 
 try:
     import llfuse
@@ -328,9 +329,10 @@ class ArchiverCheckTestCase(ArchiverTestCaseBase):
 
     def setUp(self):
         super(ArchiverCheckTestCase, self).setUp()
-        self.attic('init', self.repository_location)
-        self.create_src_archive('archive1')
-        self.create_src_archive('archive2')
+        with patch.object(ChunkBuffer, 'BUFFER_SIZE', 10):
+            self.attic('init', self.repository_location)
+            self.create_src_archive('archive1')
+            self.create_src_archive('archive2')
 
     def open_archive(self, name):
         repository = Repository(self.repository_path)
@@ -351,7 +353,7 @@ class ArchiverCheckTestCase(ArchiverTestCaseBase):
 
     def test_missing_archive_item_chunk(self):
         archive, repository = self.open_archive('archive1')
-        repository.delete(archive.metadata[b'items'][-1])
+        repository.delete(archive.metadata[b'items'][-5])
         repository.commit()
         self.attic('check', self.repository_location, exit_code=1)
         self.attic('check', '--repair', self.repository_location, exit_code=0)