archive.py 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185
  1. import os
  2. from datetime import datetime, timezone
  3. from io import StringIO
  4. from unittest.mock import Mock
  5. import pytest
  6. import msgpack
  7. from ..archive import Archive, CacheChunkBuffer, RobustUnpacker, Statistics
  8. from ..key import PlaintextKey
  9. from ..helpers import Manifest
  10. from . import BaseTestCase
  11. @pytest.fixture()
  12. def stats():
  13. stats = Statistics()
  14. stats.update(20, 10, unique=True)
  15. return stats
  16. def test_stats_basic(stats):
  17. assert stats.osize == 20
  18. assert stats.csize == stats.usize == 10
  19. stats.update(20, 10, unique=False)
  20. assert stats.osize == 40
  21. assert stats.csize == 20
  22. assert stats.usize == 10
  23. def tests_stats_progress(stats, columns=80):
  24. os.environ['COLUMNS'] = str(columns)
  25. out = StringIO()
  26. stats.show_progress(stream=out)
  27. s = '20 B O 10 B C 10 B D 0 N '
  28. buf = ' ' * (columns - len(s))
  29. assert out.getvalue() == s + buf + "\r"
  30. out = StringIO()
  31. stats.update(10**3, 0, unique=False)
  32. stats.show_progress(item={b'path': 'foo'}, final=False, stream=out)
  33. s = '1.02 kB O 10 B C 10 B D 0 N foo'
  34. buf = ' ' * (columns - len(s))
  35. assert out.getvalue() == s + buf + "\r"
  36. out = StringIO()
  37. stats.show_progress(item={b'path': 'foo'*40}, final=False, stream=out)
  38. s = '1.02 kB O 10 B C 10 B D 0 N foofoofoofoofoofoofoofo...oofoofoofoofoofoofoofoofoo'
  39. buf = ' ' * (columns - len(s))
  40. assert out.getvalue() == s + buf + "\r"
  41. def test_stats_format(stats):
  42. assert str(stats) == """\
  43. Original size Compressed size Deduplicated size
  44. This archive: 20 B 10 B 10 B"""
  45. s = "{0.osize_fmt}".format(stats)
  46. assert s == "20 B"
  47. # kind of redundant, but id is variable so we can't match reliably
  48. assert repr(stats) == '<Statistics object at {:#x} (20, 10, 10)>'.format(id(stats))
  49. class MockCache:
  50. def __init__(self):
  51. self.objects = {}
  52. def add_chunk(self, id, chunk, stats=None):
  53. self.objects[id] = chunk.data
  54. return id, len(chunk.data), len(chunk.data)
  55. class ArchiveTimestampTestCase(BaseTestCase):
  56. def _test_timestamp_parsing(self, isoformat, expected):
  57. repository = Mock()
  58. key = PlaintextKey(repository)
  59. manifest = Manifest(repository, key)
  60. a = Archive(repository, key, manifest, 'test', create=True)
  61. a.metadata = {b'time': isoformat}
  62. self.assert_equal(a.ts, expected)
  63. def test_with_microseconds(self):
  64. self._test_timestamp_parsing(
  65. '1970-01-01T00:00:01.000001',
  66. datetime(1970, 1, 1, 0, 0, 1, 1, timezone.utc))
  67. def test_without_microseconds(self):
  68. self._test_timestamp_parsing(
  69. '1970-01-01T00:00:01',
  70. datetime(1970, 1, 1, 0, 0, 1, 0, timezone.utc))
  71. class ChunkBufferTestCase(BaseTestCase):
  72. def test(self):
  73. data = [{b'foo': 1}, {b'bar': 2}]
  74. cache = MockCache()
  75. key = PlaintextKey(None)
  76. chunks = CacheChunkBuffer(cache, key, None)
  77. for d in data:
  78. chunks.add(d)
  79. chunks.flush()
  80. chunks.flush(flush=True)
  81. self.assert_equal(len(chunks.chunks), 2)
  82. unpacker = msgpack.Unpacker()
  83. for id in chunks.chunks:
  84. unpacker.feed(cache.objects[id])
  85. self.assert_equal(data, list(unpacker))
  86. def test_partial(self):
  87. big = b"0123456789" * 10000
  88. data = [{b'full': 1, b'data': big}, {b'partial': 2, b'data': big}]
  89. cache = MockCache()
  90. key = PlaintextKey(None)
  91. chunks = CacheChunkBuffer(cache, key, None)
  92. for d in data:
  93. chunks.add(d)
  94. chunks.flush(flush=False)
  95. # the code is expected to leave the last partial chunk in the buffer
  96. self.assert_equal(len(chunks.chunks), 3)
  97. self.assert_true(chunks.buffer.tell() > 0)
  98. # now really flush
  99. chunks.flush(flush=True)
  100. self.assert_equal(len(chunks.chunks), 4)
  101. self.assert_true(chunks.buffer.tell() == 0)
  102. unpacker = msgpack.Unpacker()
  103. for id in chunks.chunks:
  104. unpacker.feed(cache.objects[id])
  105. self.assert_equal(data, list(unpacker))
  106. class RobustUnpackerTestCase(BaseTestCase):
  107. def make_chunks(self, items):
  108. return b''.join(msgpack.packb({'path': item}) for item in items)
  109. def _validator(self, value):
  110. return isinstance(value, dict) and value.get(b'path') in (b'foo', b'bar', b'boo', b'baz')
  111. def process(self, input):
  112. unpacker = RobustUnpacker(validator=self._validator)
  113. result = []
  114. for should_sync, chunks in input:
  115. if should_sync:
  116. unpacker.resync()
  117. for data in chunks:
  118. unpacker.feed(data)
  119. for item in unpacker:
  120. result.append(item)
  121. return result
  122. def test_extra_garbage_no_sync(self):
  123. chunks = [(False, [self.make_chunks([b'foo', b'bar'])]),
  124. (False, [b'garbage'] + [self.make_chunks([b'boo', b'baz'])])]
  125. result = self.process(chunks)
  126. self.assert_equal(result, [
  127. {b'path': b'foo'}, {b'path': b'bar'},
  128. 103, 97, 114, 98, 97, 103, 101,
  129. {b'path': b'boo'},
  130. {b'path': b'baz'}])
  131. def split(self, left, length):
  132. parts = []
  133. while left:
  134. parts.append(left[:length])
  135. left = left[length:]
  136. return parts
  137. def test_correct_stream(self):
  138. chunks = self.split(self.make_chunks([b'foo', b'bar', b'boo', b'baz']), 2)
  139. input = [(False, chunks)]
  140. result = self.process(input)
  141. self.assert_equal(result, [{b'path': b'foo'}, {b'path': b'bar'}, {b'path': b'boo'}, {b'path': b'baz'}])
  142. def test_missing_chunk(self):
  143. chunks = self.split(self.make_chunks([b'foo', b'bar', b'boo', b'baz']), 4)
  144. input = [(False, chunks[:3]), (True, chunks[4:])]
  145. result = self.process(input)
  146. self.assert_equal(result, [{b'path': b'foo'}, {b'path': b'boo'}, {b'path': b'baz'}])
  147. def test_corrupt_chunk(self):
  148. chunks = self.split(self.make_chunks([b'foo', b'bar', b'boo', b'baz']), 4)
  149. input = [(False, chunks[:3]), (True, [b'gar', b'bage'] + chunks[3:])]
  150. result = self.process(input)
  151. self.assert_equal(result, [{b'path': b'foo'}, {b'path': b'boo'}, {b'path': b'baz'}])