archive.py 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
  1. import msgpack
  2. from attic.testsuite import AtticTestCase
  3. from attic.testsuite.mock import Mock
  4. from attic.archive import Archive, CacheChunkBuffer, RobustUnpacker
  5. from attic.key import PlaintextKey, COMPR_DEFAULT
  6. from attic.helpers import Manifest
  7. from datetime import datetime, timezone
  8. class MockCache:
  9. def __init__(self):
  10. self.objects = {}
  11. def add_chunk(self, id, data, stats=None):
  12. self.objects[id] = data
  13. return id, len(data), len(data)
  14. class ArchiveTimestampTestCase(AtticTestCase):
  15. class MockArgs(object):
  16. repository = None
  17. compression = COMPR_DEFAULT
  18. mac = None
  19. cipher = None
  20. def _test_timestamp_parsing(self, isoformat, expected):
  21. repository = Mock()
  22. key = PlaintextKey.create(None, self.MockArgs())
  23. manifest = Manifest(repository, key)
  24. a = Archive(repository, key, manifest, 'test', create=True)
  25. a.metadata = {b'time': isoformat}
  26. self.assert_equal(a.ts, expected)
  27. def test_with_microseconds(self):
  28. self._test_timestamp_parsing(
  29. '1970-01-01T00:00:01.000001',
  30. datetime(1970, 1, 1, 0, 0, 1, 1, timezone.utc))
  31. def test_without_microseconds(self):
  32. self._test_timestamp_parsing(
  33. '1970-01-01T00:00:01',
  34. datetime(1970, 1, 1, 0, 0, 1, 0, timezone.utc))
  35. class ChunkBufferTestCase(AtticTestCase):
  36. class MockArgs(object):
  37. repository = None
  38. compression = COMPR_DEFAULT
  39. mac = None
  40. cipher = None
  41. def test(self):
  42. data = [{b'foo': 1}, {b'bar': 2}]
  43. cache = MockCache()
  44. key = PlaintextKey.create(None, self.MockArgs())
  45. chunks = CacheChunkBuffer(cache, key, None)
  46. for d in data:
  47. chunks.add(d)
  48. chunks.flush()
  49. chunks.flush(flush=True)
  50. self.assert_equal(len(chunks.chunks), 2)
  51. unpacker = msgpack.Unpacker()
  52. for id in chunks.chunks:
  53. unpacker.feed(cache.objects[id])
  54. self.assert_equal(data, list(unpacker))
  55. class RobustUnpackerTestCase(AtticTestCase):
  56. def make_chunks(self, items):
  57. return b''.join(msgpack.packb({'path': item}) for item in items)
  58. def _validator(self, value):
  59. return isinstance(value, dict) and value.get(b'path') in (b'foo', b'bar', b'boo', b'baz')
  60. def process(self, input):
  61. unpacker = RobustUnpacker(validator=self._validator)
  62. result = []
  63. for should_sync, chunks in input:
  64. if should_sync:
  65. unpacker.resync()
  66. for data in chunks:
  67. unpacker.feed(data)
  68. for item in unpacker:
  69. result.append(item)
  70. return result
  71. def test_extra_garbage_no_sync(self):
  72. chunks = [(False, [self.make_chunks([b'foo', b'bar'])]),
  73. (False, [b'garbage'] + [self.make_chunks([b'boo', b'baz'])])]
  74. result = self.process(chunks)
  75. self.assert_equal(result, [
  76. {b'path': b'foo'}, {b'path': b'bar'},
  77. 103, 97, 114, 98, 97, 103, 101,
  78. {b'path': b'boo'},
  79. {b'path': b'baz'}])
  80. def split(self, left, length):
  81. parts = []
  82. while left:
  83. parts.append(left[:length])
  84. left = left[length:]
  85. return parts
  86. def test_correct_stream(self):
  87. chunks = self.split(self.make_chunks([b'foo', b'bar', b'boo', b'baz']), 2)
  88. input = [(False, chunks)]
  89. result = self.process(input)
  90. self.assert_equal(result, [{b'path': b'foo'}, {b'path': b'bar'}, {b'path': b'boo'}, {b'path': b'baz'}])
  91. def test_missing_chunk(self):
  92. chunks = self.split(self.make_chunks([b'foo', b'bar', b'boo', b'baz']), 4)
  93. input = [(False, chunks[:3]), (True, chunks[4:])]
  94. result = self.process(input)
  95. self.assert_equal(result, [{b'path': b'foo'}, {b'path': b'boo'}, {b'path': b'baz'}])
  96. def test_corrupt_chunk(self):
  97. chunks = self.split(self.make_chunks([b'foo', b'bar', b'boo', b'baz']), 4)
  98. input = [(False, chunks[:3]), (True, [b'gar', b'bage'] + chunks[3:])]
  99. result = self.process(input)
  100. self.assert_equal(result, [{b'path': b'foo'}, {b'path': b'boo'}, {b'path': b'baz'}])