archive.py 3.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. import msgpack
  2. from attic.testsuite import AtticTestCase
  3. from attic.archive import CacheChunkBuffer, RobustUnpacker
  4. from attic.key import PlaintextKey, COMPR_DEFAULT
  5. class MockCache:
  6. def __init__(self):
  7. self.objects = {}
  8. def add_chunk(self, id, data, stats=None):
  9. self.objects[id] = data
  10. return id, len(data), len(data)
  11. class ChunkBufferTestCase(AtticTestCase):
  12. class MockArgs(object):
  13. repository = None
  14. compression = COMPR_DEFAULT
  15. mac = None
  16. cipher = None
  17. def test(self):
  18. data = [{b'foo': 1}, {b'bar': 2}]
  19. cache = MockCache()
  20. key = PlaintextKey.create(None, self.MockArgs())
  21. chunks = CacheChunkBuffer(cache, key, None)
  22. for d in data:
  23. chunks.add(d)
  24. chunks.flush()
  25. chunks.flush(flush=True)
  26. self.assert_equal(len(chunks.chunks), 2)
  27. unpacker = msgpack.Unpacker()
  28. for id in chunks.chunks:
  29. unpacker.feed(cache.objects[id])
  30. self.assert_equal(data, list(unpacker))
  31. class RobustUnpackerTestCase(AtticTestCase):
  32. def make_chunks(self, items):
  33. return b''.join(msgpack.packb({'path': item}) for item in items)
  34. def _validator(self, value):
  35. return isinstance(value, dict) and value.get(b'path') in (b'foo', b'bar', b'boo', b'baz')
  36. def process(self, input):
  37. unpacker = RobustUnpacker(validator=self._validator)
  38. result = []
  39. for should_sync, chunks in input:
  40. if should_sync:
  41. unpacker.resync()
  42. for data in chunks:
  43. unpacker.feed(data)
  44. for item in unpacker:
  45. result.append(item)
  46. return result
  47. def test_extra_garbage_no_sync(self):
  48. chunks = [(False, [self.make_chunks([b'foo', b'bar'])]),
  49. (False, [b'garbage'] + [self.make_chunks([b'boo', b'baz'])])]
  50. result = self.process(chunks)
  51. self.assert_equal(result, [
  52. {b'path': b'foo'}, {b'path': b'bar'},
  53. 103, 97, 114, 98, 97, 103, 101,
  54. {b'path': b'boo'},
  55. {b'path': b'baz'}])
  56. def split(self, left, length):
  57. parts = []
  58. while left:
  59. parts.append(left[:length])
  60. left = left[length:]
  61. return parts
  62. def test_correct_stream(self):
  63. chunks = self.split(self.make_chunks([b'foo', b'bar', b'boo', b'baz']), 2)
  64. input = [(False, chunks)]
  65. result = self.process(input)
  66. self.assert_equal(result, [{b'path': b'foo'}, {b'path': b'bar'}, {b'path': b'boo'}, {b'path': b'baz'}])
  67. def test_missing_chunk(self):
  68. chunks = self.split(self.make_chunks([b'foo', b'bar', b'boo', b'baz']), 4)
  69. input = [(False, chunks[:3]), (True, chunks[4:])]
  70. result = self.process(input)
  71. self.assert_equal(result, [{b'path': b'foo'}, {b'path': b'boo'}, {b'path': b'baz'}])
  72. def test_corrupt_chunk(self):
  73. chunks = self.split(self.make_chunks([b'foo', b'bar', b'boo', b'baz']), 4)
  74. input = [(False, chunks[:3]), (True, [b'gar', b'bage'] + chunks[3:])]
  75. result = self.process(input)
  76. self.assert_equal(result, [{b'path': b'foo'}, {b'path': b'boo'}, {b'path': b'baz'}])