| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293 | import msgpackfrom attic.testsuite import AtticTestCasefrom attic.archive import CacheChunkBuffer, RobustUnpackerfrom attic.key import PlaintextKey, COMPR_DEFAULTclass MockCache:    def __init__(self):        self.objects = {}    def add_chunk(self, id, data, stats=None):        self.objects[id] = data        return id, len(data), len(data)class ChunkBufferTestCase(AtticTestCase):    class MockArgs(object):        repository = None        compression = COMPR_DEFAULT        mac = None    def test(self):        data = [{b'foo': 1}, {b'bar': 2}]        cache = MockCache()        key = PlaintextKey.create(None, self.MockArgs())        chunks = CacheChunkBuffer(cache, key, None)        for d in data:            chunks.add(d)            chunks.flush()        chunks.flush(flush=True)        self.assert_equal(len(chunks.chunks), 2)        unpacker = msgpack.Unpacker()        for id in chunks.chunks:            unpacker.feed(cache.objects[id])        self.assert_equal(data, list(unpacker))class RobustUnpackerTestCase(AtticTestCase):    def make_chunks(self, items):        return b''.join(msgpack.packb({'path': item}) for item in items)    def _validator(self, value):        return isinstance(value, dict) and value.get(b'path') in (b'foo', b'bar', b'boo', b'baz')    def process(self, input):        unpacker = RobustUnpacker(validator=self._validator)        result = []        for should_sync, chunks in input:            if should_sync:                unpacker.resync()            for data in chunks:                unpacker.feed(data)                for item in unpacker:                    result.append(item)        return result    def test_extra_garbage_no_sync(self):        chunks = [(False, [self.make_chunks([b'foo', b'bar'])]),                  (False, [b'garbage'] + [self.make_chunks([b'boo', b'baz'])])]        result = self.process(chunks)        self.assert_equal(result, [            {b'path': b'foo'}, {b'path': b'bar'},            103, 97, 114, 98, 97, 103, 101,            {b'path': b'boo'},            {b'path': b'baz'}])    def split(self, left, length):        parts = []        while left:            parts.append(left[:length])            left = left[length:]        return parts    def test_correct_stream(self):        chunks = self.split(self.make_chunks([b'foo', b'bar', b'boo', b'baz']), 2)        input = [(False, chunks)]        result = self.process(input)        self.assert_equal(result, [{b'path': b'foo'}, {b'path': b'bar'}, {b'path': b'boo'}, {b'path': b'baz'}])    def test_missing_chunk(self):        chunks = self.split(self.make_chunks([b'foo', b'bar', b'boo', b'baz']), 4)        input = [(False, chunks[:3]), (True, chunks[4:])]        result = self.process(input)        self.assert_equal(result, [{b'path': b'foo'}, {b'path': b'boo'}, {b'path': b'baz'}])    def test_corrupt_chunk(self):        chunks = self.split(self.make_chunks([b'foo', b'bar', b'boo', b'baz']), 4)        input = [(False, chunks[:3]), (True, [b'gar', b'bage'] + chunks[3:])]        result = self.process(input)        self.assert_equal(result, [{b'path': b'foo'}, {b'path': b'boo'}, {b'path': b'baz'}])
 |