archive.py 853 B

1234567891011121314151617181920212223242526272829303132
  1. import msgpack
  2. from attic.testsuite import AtticTestCase
  3. from attic.archive import ChunkBuffer
  4. from attic.key import PlaintextKey
  5. class MockCache:
  6. def __init__(self):
  7. self.objects = {}
  8. def add_chunk(self, id, data, stats=None):
  9. self.objects[id] = data
  10. return id, len(data), len(data)
  11. class ChunkBufferTestCase(AtticTestCase):
  12. def test(self):
  13. data = [{b'foo': 1}, {b'bar': 2}]
  14. cache = MockCache()
  15. key = PlaintextKey()
  16. chunks = ChunkBuffer(cache, key, None)
  17. for d in data:
  18. chunks.add(d)
  19. chunks.flush()
  20. chunks.flush(flush=True)
  21. self.assert_equal(len(chunks.chunks), 2)
  22. unpacker = msgpack.Unpacker()
  23. for id in chunks.chunks:
  24. unpacker.feed(cache.objects[id])
  25. self.assert_equal(data, list(unpacker))