chunker.py 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445
  1. # Note: these tests are part of the self test, do not use or import pytest functionality here.
  2. # See borg.selftest for details. If you add/remove test methods, update SELFTEST_COUNT
  3. from io import BytesIO
  4. from ..chunker import Chunker, buzhash, buzhash_update
  5. from ..constants import * # NOQA
  6. from . import BaseTestCase
  7. class ChunkerTestCase(BaseTestCase):
  8. def test_chunkify(self):
  9. data = b'0' * int(1.5 * (1 << CHUNK_MAX_EXP)) + b'Y'
  10. parts = [bytes(c) for c in Chunker(0, 1, CHUNK_MAX_EXP, 2, 2).chunkify(BytesIO(data))]
  11. self.assert_equal(len(parts), 2)
  12. self.assert_equal(b''.join(parts), data)
  13. self.assert_equal([bytes(c) for c in Chunker(0, 1, CHUNK_MAX_EXP, 2, 2).chunkify(BytesIO(b''))], [])
  14. self.assert_equal([bytes(c) for c in Chunker(0, 1, CHUNK_MAX_EXP, 2, 2).chunkify(BytesIO(b'foobarboobaz' * 3))], [b'fooba', b'rboobaz', b'fooba', b'rboobaz', b'fooba', b'rboobaz'])
  15. self.assert_equal([bytes(c) for c in Chunker(1, 1, CHUNK_MAX_EXP, 2, 2).chunkify(BytesIO(b'foobarboobaz' * 3))], [b'fo', b'obarb', b'oob', b'azf', b'oobarb', b'oob', b'azf', b'oobarb', b'oobaz'])
  16. self.assert_equal([bytes(c) for c in Chunker(2, 1, CHUNK_MAX_EXP, 2, 2).chunkify(BytesIO(b'foobarboobaz' * 3))], [b'foob', b'ar', b'boobazfoob', b'ar', b'boobazfoob', b'ar', b'boobaz'])
  17. self.assert_equal([bytes(c) for c in Chunker(0, 2, CHUNK_MAX_EXP, 2, 3).chunkify(BytesIO(b'foobarboobaz' * 3))], [b'foobarboobaz' * 3])
  18. self.assert_equal([bytes(c) for c in Chunker(1, 2, CHUNK_MAX_EXP, 2, 3).chunkify(BytesIO(b'foobarboobaz' * 3))], [b'foobar', b'boobazfo', b'obar', b'boobazfo', b'obar', b'boobaz'])
  19. self.assert_equal([bytes(c) for c in Chunker(2, 2, CHUNK_MAX_EXP, 2, 3).chunkify(BytesIO(b'foobarboobaz' * 3))], [b'foob', b'arboobaz', b'foob', b'arboobaz', b'foob', b'arboobaz'])
  20. self.assert_equal([bytes(c) for c in Chunker(0, 3, CHUNK_MAX_EXP, 2, 3).chunkify(BytesIO(b'foobarboobaz' * 3))], [b'foobarboobaz' * 3])
  21. self.assert_equal([bytes(c) for c in Chunker(1, 3, CHUNK_MAX_EXP, 2, 3).chunkify(BytesIO(b'foobarboobaz' * 3))], [b'foobarbo', b'obazfoobar', b'boobazfo', b'obarboobaz'])
  22. self.assert_equal([bytes(c) for c in Chunker(2, 3, CHUNK_MAX_EXP, 2, 3).chunkify(BytesIO(b'foobarboobaz' * 3))], [b'foobarboobaz', b'foobarboobaz', b'foobarboobaz'])
  23. def test_buzhash(self):
  24. self.assert_equal(buzhash(b'abcdefghijklmnop', 0), 3795437769)
  25. self.assert_equal(buzhash(b'abcdefghijklmnop', 1), 3795400502)
  26. self.assert_equal(buzhash(b'abcdefghijklmnop', 1), buzhash_update(buzhash(b'Xabcdefghijklmno', 1), ord('X'), ord('p'), 16, 1))
  27. # Test with more than 31 bytes to make sure our barrel_shift macro works correctly
  28. self.assert_equal(buzhash(b'abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz', 0), 566521248)
  29. def test_small_reads(self):
  30. class SmallReadFile:
  31. input = b'a' * (20 + 1)
  32. def read(self, nbytes):
  33. self.input = self.input[:-1]
  34. return self.input[:1]
  35. reconstructed = b''.join(Chunker(0, *CHUNKER_PARAMS).chunkify(SmallReadFile()))
  36. assert reconstructed == b'a' * 20