| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394 | 
							- import msgpack
 
- from attic.testsuite import AtticTestCase
 
- from attic.archive import CacheChunkBuffer, RobustUnpacker
 
- from attic.key import PlaintextKey, COMPR_DEFAULT
 
- class MockCache:
 
-     def __init__(self):
 
-         self.objects = {}
 
-     def add_chunk(self, id, data, stats=None):
 
-         self.objects[id] = data
 
-         return id, len(data), len(data)
 
- class ChunkBufferTestCase(AtticTestCase):
 
-     class MockArgs(object):
 
-         repository = None
 
-         compression = COMPR_DEFAULT
 
-         mac = None
 
-         cipher = None
 
-     def test(self):
 
-         data = [{b'foo': 1}, {b'bar': 2}]
 
-         cache = MockCache()
 
-         key = PlaintextKey.create(None, self.MockArgs())
 
-         chunks = CacheChunkBuffer(cache, key, None)
 
-         for d in data:
 
-             chunks.add(d)
 
-             chunks.flush()
 
-         chunks.flush(flush=True)
 
-         self.assert_equal(len(chunks.chunks), 2)
 
-         unpacker = msgpack.Unpacker()
 
-         for id in chunks.chunks:
 
-             unpacker.feed(cache.objects[id])
 
-         self.assert_equal(data, list(unpacker))
 
- class RobustUnpackerTestCase(AtticTestCase):
 
-     def make_chunks(self, items):
 
-         return b''.join(msgpack.packb({'path': item}) for item in items)
 
-     def _validator(self, value):
 
-         return isinstance(value, dict) and value.get(b'path') in (b'foo', b'bar', b'boo', b'baz')
 
-     def process(self, input):
 
-         unpacker = RobustUnpacker(validator=self._validator)
 
-         result = []
 
-         for should_sync, chunks in input:
 
-             if should_sync:
 
-                 unpacker.resync()
 
-             for data in chunks:
 
-                 unpacker.feed(data)
 
-                 for item in unpacker:
 
-                     result.append(item)
 
-         return result
 
-     def test_extra_garbage_no_sync(self):
 
-         chunks = [(False, [self.make_chunks([b'foo', b'bar'])]),
 
-                   (False, [b'garbage'] + [self.make_chunks([b'boo', b'baz'])])]
 
-         result = self.process(chunks)
 
-         self.assert_equal(result, [
 
-             {b'path': b'foo'}, {b'path': b'bar'},
 
-             103, 97, 114, 98, 97, 103, 101,
 
-             {b'path': b'boo'},
 
-             {b'path': b'baz'}])
 
-     def split(self, left, length):
 
-         parts = []
 
-         while left:
 
-             parts.append(left[:length])
 
-             left = left[length:]
 
-         return parts
 
-     def test_correct_stream(self):
 
-         chunks = self.split(self.make_chunks([b'foo', b'bar', b'boo', b'baz']), 2)
 
-         input = [(False, chunks)]
 
-         result = self.process(input)
 
-         self.assert_equal(result, [{b'path': b'foo'}, {b'path': b'bar'}, {b'path': b'boo'}, {b'path': b'baz'}])
 
-     def test_missing_chunk(self):
 
-         chunks = self.split(self.make_chunks([b'foo', b'bar', b'boo', b'baz']), 4)
 
-         input = [(False, chunks[:3]), (True, chunks[4:])]
 
-         result = self.process(input)
 
-         self.assert_equal(result, [{b'path': b'foo'}, {b'path': b'boo'}, {b'path': b'baz'}])
 
-     def test_corrupt_chunk(self):
 
-         chunks = self.split(self.make_chunks([b'foo', b'bar', b'boo', b'baz']), 4)
 
-         input = [(False, chunks[:3]), (True, [b'gar', b'bage'] + chunks[3:])]
 
-         result = self.process(input)
 
-         self.assert_equal(result, [{b'path': b'foo'}, {b'path': b'boo'}, {b'path': b'baz'}])
 
 
  |