2
0

archive.py 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384
  1. import msgpack
  2. from attic.testsuite import AtticTestCase
  3. from attic.archive import CacheChunkBuffer, RobustUnpacker
  4. from attic.key import PlaintextKey
  5. class MockCache:
  6. def __init__(self):
  7. self.objects = {}
  8. def add_chunk(self, id, data, stats=None):
  9. self.objects[id] = data
  10. return id, len(data), len(data)
  11. class ChunkBufferTestCase(AtticTestCase):
  12. def test(self):
  13. data = [{b'foo': 1}, {b'bar': 2}]
  14. cache = MockCache()
  15. key = PlaintextKey()
  16. chunks = CacheChunkBuffer(cache, key, None)
  17. for d in data:
  18. chunks.add(d)
  19. chunks.flush()
  20. chunks.flush(flush=True)
  21. self.assert_equal(len(chunks.chunks), 2)
  22. unpacker = msgpack.Unpacker()
  23. for id in chunks.chunks:
  24. unpacker.feed(cache.objects[id])
  25. self.assert_equal(data, list(unpacker))
  26. class RobustUnpackerTestCase(AtticTestCase):
  27. def make_chunks(self, items):
  28. return b''.join(msgpack.packb(item) for item in items)
  29. def _validator(self, value):
  30. return value in (b'foo', b'bar', b'boo', b'baz')
  31. def process(self, input):
  32. unpacker = RobustUnpacker(validator=self._validator)
  33. result = []
  34. for should_sync, chunks in input:
  35. if should_sync:
  36. unpacker.resync()
  37. for data in chunks:
  38. unpacker.feed(data)
  39. for item in unpacker:
  40. result.append(item)
  41. return result
  42. def test_extra_garbage_no_sync(self):
  43. chunks = [(False, [self.make_chunks([b'foo', b'bar'])]),
  44. (False, [b'garbage'] + [self.make_chunks([b'boo', b'baz'])])]
  45. result = self.process(chunks)
  46. self.assert_equal(result, [b'foo', b'bar', 103, 97, 114, 98, 97, 103, 101, b'boo', b'baz'])
  47. def split(self, left, length):
  48. parts = []
  49. while left:
  50. parts.append(left[:length])
  51. left = left[length:]
  52. return parts
  53. def test_correct_stream(self):
  54. chunks = self.split(self.make_chunks([b'foo', b'bar', b'boo', b'baz']), 2)
  55. input = [(False, chunks)]
  56. result = self.process(input)
  57. self.assert_equal(result, [b'foo', b'bar', b'boo', b'baz'])
  58. def test_missing_chunk(self):
  59. chunks = self.split(self.make_chunks([b'foo', b'bar', b'boo', b'baz']), 2)
  60. input = [(False, chunks[:3]), (True, chunks[4:])]
  61. result = self.process(input)
  62. self.assert_equal(result, [b'foo', b'boo', b'baz'])
  63. def test_corrupt_chunk(self):
  64. chunks = self.split(self.make_chunks([b'foo', b'bar', b'boo', b'baz']), 2)
  65. input = [(False, chunks[:3]), (True, [b'gar', b'bage'] + chunks[3:])]
  66. result = self.process(input)
  67. self.assert_equal(result, [b'foo', b'boo', b'baz'])