| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788 | import msgpackfrom attic.testsuite import AtticTestCasefrom attic.archive import CacheChunkBuffer, RobustUnpackerfrom attic.key import PlaintextKeyclass MockCache:    def __init__(self):        self.objects = {}    def add_chunk(self, id, data, stats=None):        self.objects[id] = data        return id, len(data), len(data)class ChunkBufferTestCase(AtticTestCase):    def test(self):        data = [{b'foo': 1}, {b'bar': 2}]        cache = MockCache()        key = PlaintextKey()        chunks = CacheChunkBuffer(cache, key, None)        for d in data:            chunks.add(d)            chunks.flush()        chunks.flush(flush=True)        self.assert_equal(len(chunks.chunks), 2)        unpacker = msgpack.Unpacker()        for id in chunks.chunks:            unpacker.feed(cache.objects[id])        self.assert_equal(data, list(unpacker))class RobustUnpackerTestCase(AtticTestCase):    def make_chunks(self, items):        return b''.join(msgpack.packb({'path': item}) for item in items)    def _validator(self, value):        return isinstance(value, dict) and value.get(b'path') in (b'foo', b'bar', b'boo', b'baz')    def process(self, input):        unpacker = RobustUnpacker(validator=self._validator)        result = []        for should_sync, chunks in input:            if should_sync:                unpacker.resync()            for data in chunks:                unpacker.feed(data)                for item in unpacker:                    result.append(item)        return result    def test_extra_garbage_no_sync(self):        chunks = [(False, [self.make_chunks([b'foo', b'bar'])]),                  (False, [b'garbage'] + [self.make_chunks([b'boo', b'baz'])])]        result = self.process(chunks)        self.assert_equal(result, [            {b'path': b'foo'}, {b'path': b'bar'},            103, 97, 114, 98, 97, 103, 101,            {b'path': b'boo'},            {b'path': b'baz'}])    def split(self, left, length):        parts = []        while left:            parts.append(left[:length])            left = left[length:]        return parts    def test_correct_stream(self):        chunks = self.split(self.make_chunks([b'foo', b'bar', b'boo', b'baz']), 2)        input = [(False, chunks)]        result = self.process(input)        self.assert_equal(result, [{b'path': b'foo'}, {b'path': b'bar'}, {b'path': b'boo'}, {b'path': b'baz'}])    def test_missing_chunk(self):        chunks = self.split(self.make_chunks([b'foo', b'bar', b'boo', b'baz']), 4)        input = [(False, chunks[:3]), (True, chunks[4:])]        result = self.process(input)        self.assert_equal(result, [{b'path': b'foo'}, {b'path': b'boo'}, {b'path': b'baz'}])    def test_corrupt_chunk(self):        chunks = self.split(self.make_chunks([b'foo', b'bar', b'boo', b'baz']), 4)        input = [(False, chunks[:3]), (True, [b'gar', b'bage'] + chunks[3:])]        result = self.process(input)        self.assert_equal(result, [{b'path': b'foo'}, {b'path': b'boo'}, {b'path': b'baz'}])
 |