|
@@ -8,18 +8,32 @@ from . import BaseTestCase
|
|
|
# See borg.selftest for details. If you add/remove test methods, update SELFTEST_COUNT
|
|
|
|
|
|
|
|
|
+def cf(chunks):
|
|
|
+ """chunk filter"""
|
|
|
+ # this is to simplify testing: either return the data piece (bytes) or the hole length (int).
|
|
|
+ def _cf(chunk):
|
|
|
+ if chunk.meta['allocation'] == CH_DATA:
|
|
|
+ assert len(chunk.data) == chunk.meta['size']
|
|
|
+ return bytes(chunk.data) # make sure we have bytes, not memoryview
|
|
|
+ if chunk.meta['allocation'] in (CH_HOLE, CH_ALLOC):
|
|
|
+ assert chunk.data is None
|
|
|
+ return chunk.meta['size']
|
|
|
+ assert False, "unexpected allocation value"
|
|
|
+ return [_cf(chunk) for chunk in chunks]
|
|
|
+
|
|
|
+
|
|
|
class ChunkerFixedTestCase(BaseTestCase):
|
|
|
|
|
|
def test_chunkify_just_blocks(self):
|
|
|
data = b'foobar' * 1500
|
|
|
chunker = ChunkerFixed(4096)
|
|
|
- parts = [c for c in chunker.chunkify(BytesIO(data))]
|
|
|
+ parts = cf(chunker.chunkify(BytesIO(data)))
|
|
|
self.assert_equal(parts, [data[0:4096], data[4096:8192], data[8192:]])
|
|
|
|
|
|
def test_chunkify_header_and_blocks(self):
|
|
|
data = b'foobar' * 1500
|
|
|
chunker = ChunkerFixed(4096, 123)
|
|
|
- parts = [c for c in chunker.chunkify(BytesIO(data))]
|
|
|
+ parts = cf(chunker.chunkify(BytesIO(data)))
|
|
|
self.assert_equal(parts, [data[0:123], data[123:123+4096], data[123+4096:123+8192], data[123+8192:]])
|
|
|
|
|
|
def test_chunkify_just_blocks_fmap_complete(self):
|
|
@@ -30,7 +44,7 @@ class ChunkerFixedTestCase(BaseTestCase):
|
|
|
(4096, 8192, True),
|
|
|
(8192, 99999999, True),
|
|
|
]
|
|
|
- parts = [c for c in chunker.chunkify(BytesIO(data), fmap=fmap)]
|
|
|
+ parts = cf(chunker.chunkify(BytesIO(data), fmap=fmap))
|
|
|
self.assert_equal(parts, [data[0:4096], data[4096:8192], data[8192:]])
|
|
|
|
|
|
def test_chunkify_header_and_blocks_fmap_complete(self):
|
|
@@ -42,7 +56,7 @@ class ChunkerFixedTestCase(BaseTestCase):
|
|
|
(123+4096, 4096, True),
|
|
|
(123+8192, 4096, True),
|
|
|
]
|
|
|
- parts = [c for c in chunker.chunkify(BytesIO(data), fmap=fmap)]
|
|
|
+ parts = cf(chunker.chunkify(BytesIO(data), fmap=fmap))
|
|
|
self.assert_equal(parts, [data[0:123], data[123:123+4096], data[123+4096:123+8192], data[123+8192:]])
|
|
|
|
|
|
def test_chunkify_header_and_blocks_fmap_zeros(self):
|
|
@@ -54,9 +68,9 @@ class ChunkerFixedTestCase(BaseTestCase):
|
|
|
(123+4096, 4096, True),
|
|
|
(123+8192, 4096, False),
|
|
|
]
|
|
|
- parts = [c for c in chunker.chunkify(BytesIO(data), fmap=fmap)]
|
|
|
- # because we marked the '_' ranges as holes, we will get '\0' ranges instead!
|
|
|
- self.assert_equal(parts, [data[0:123], b'\0' * 4096, data[123+4096:123+8192], b'\0' * 4096])
|
|
|
+ parts = cf(chunker.chunkify(BytesIO(data), fmap=fmap))
|
|
|
+ # because we marked the '_' ranges as holes, we will get hole ranges instead!
|
|
|
+ self.assert_equal(parts, [data[0:123], 4096, data[123+4096:123+8192], 4096])
|
|
|
|
|
|
def test_chunkify_header_and_blocks_fmap_partial(self):
|
|
|
data = b'H' * 123 + b'_' * 4096 + b'X' * 4096 + b'_' * 4096
|
|
@@ -67,7 +81,7 @@ class ChunkerFixedTestCase(BaseTestCase):
|
|
|
(123+4096, 4096, True),
|
|
|
# (123+8192, 4096, False),
|
|
|
]
|
|
|
- parts = [c for c in chunker.chunkify(BytesIO(data), fmap=fmap)]
|
|
|
+ parts = cf(chunker.chunkify(BytesIO(data), fmap=fmap))
|
|
|
# because we left out the '_' ranges from the fmap, we will not get them at all!
|
|
|
self.assert_equal(parts, [data[0:123], data[123+4096:123+8192]])
|
|
|
|
|
@@ -76,19 +90,19 @@ class ChunkerTestCase(BaseTestCase):
|
|
|
|
|
|
def test_chunkify(self):
|
|
|
data = b'0' * int(1.5 * (1 << CHUNK_MAX_EXP)) + b'Y'
|
|
|
- parts = [bytes(c) for c in Chunker(0, 1, CHUNK_MAX_EXP, 2, 2).chunkify(BytesIO(data))]
|
|
|
+ parts = cf(Chunker(0, 1, CHUNK_MAX_EXP, 2, 2).chunkify(BytesIO(data)))
|
|
|
self.assert_equal(len(parts), 2)
|
|
|
self.assert_equal(b''.join(parts), data)
|
|
|
- self.assert_equal([bytes(c) for c in Chunker(0, 1, CHUNK_MAX_EXP, 2, 2).chunkify(BytesIO(b''))], [])
|
|
|
- self.assert_equal([bytes(c) for c in Chunker(0, 1, CHUNK_MAX_EXP, 2, 2).chunkify(BytesIO(b'foobarboobaz' * 3))], [b'fooba', b'rboobaz', b'fooba', b'rboobaz', b'fooba', b'rboobaz'])
|
|
|
- self.assert_equal([bytes(c) for c in Chunker(1, 1, CHUNK_MAX_EXP, 2, 2).chunkify(BytesIO(b'foobarboobaz' * 3))], [b'fo', b'obarb', b'oob', b'azf', b'oobarb', b'oob', b'azf', b'oobarb', b'oobaz'])
|
|
|
- self.assert_equal([bytes(c) for c in Chunker(2, 1, CHUNK_MAX_EXP, 2, 2).chunkify(BytesIO(b'foobarboobaz' * 3))], [b'foob', b'ar', b'boobazfoob', b'ar', b'boobazfoob', b'ar', b'boobaz'])
|
|
|
- self.assert_equal([bytes(c) for c in Chunker(0, 2, CHUNK_MAX_EXP, 2, 3).chunkify(BytesIO(b'foobarboobaz' * 3))], [b'foobarboobaz' * 3])
|
|
|
- self.assert_equal([bytes(c) for c in Chunker(1, 2, CHUNK_MAX_EXP, 2, 3).chunkify(BytesIO(b'foobarboobaz' * 3))], [b'foobar', b'boobazfo', b'obar', b'boobazfo', b'obar', b'boobaz'])
|
|
|
- self.assert_equal([bytes(c) for c in Chunker(2, 2, CHUNK_MAX_EXP, 2, 3).chunkify(BytesIO(b'foobarboobaz' * 3))], [b'foob', b'arboobaz', b'foob', b'arboobaz', b'foob', b'arboobaz'])
|
|
|
- self.assert_equal([bytes(c) for c in Chunker(0, 3, CHUNK_MAX_EXP, 2, 3).chunkify(BytesIO(b'foobarboobaz' * 3))], [b'foobarboobaz' * 3])
|
|
|
- self.assert_equal([bytes(c) for c in Chunker(1, 3, CHUNK_MAX_EXP, 2, 3).chunkify(BytesIO(b'foobarboobaz' * 3))], [b'foobarbo', b'obazfoobar', b'boobazfo', b'obarboobaz'])
|
|
|
- self.assert_equal([bytes(c) for c in Chunker(2, 3, CHUNK_MAX_EXP, 2, 3).chunkify(BytesIO(b'foobarboobaz' * 3))], [b'foobarboobaz', b'foobarboobaz', b'foobarboobaz'])
|
|
|
+ self.assert_equal(cf(Chunker(0, 1, CHUNK_MAX_EXP, 2, 2).chunkify(BytesIO(b''))), [])
|
|
|
+ self.assert_equal(cf(Chunker(0, 1, CHUNK_MAX_EXP, 2, 2).chunkify(BytesIO(b'foobarboobaz' * 3))), [b'fooba', b'rboobaz', b'fooba', b'rboobaz', b'fooba', b'rboobaz'])
|
|
|
+ self.assert_equal(cf(Chunker(1, 1, CHUNK_MAX_EXP, 2, 2).chunkify(BytesIO(b'foobarboobaz' * 3))), [b'fo', b'obarb', b'oob', b'azf', b'oobarb', b'oob', b'azf', b'oobarb', b'oobaz'])
|
|
|
+ self.assert_equal(cf(Chunker(2, 1, CHUNK_MAX_EXP, 2, 2).chunkify(BytesIO(b'foobarboobaz' * 3))), [b'foob', b'ar', b'boobazfoob', b'ar', b'boobazfoob', b'ar', b'boobaz'])
|
|
|
+ self.assert_equal(cf(Chunker(0, 2, CHUNK_MAX_EXP, 2, 3).chunkify(BytesIO(b'foobarboobaz' * 3))), [b'foobarboobaz' * 3])
|
|
|
+ self.assert_equal(cf(Chunker(1, 2, CHUNK_MAX_EXP, 2, 3).chunkify(BytesIO(b'foobarboobaz' * 3))), [b'foobar', b'boobazfo', b'obar', b'boobazfo', b'obar', b'boobaz'])
|
|
|
+ self.assert_equal(cf(Chunker(2, 2, CHUNK_MAX_EXP, 2, 3).chunkify(BytesIO(b'foobarboobaz' * 3))), [b'foob', b'arboobaz', b'foob', b'arboobaz', b'foob', b'arboobaz'])
|
|
|
+ self.assert_equal(cf(Chunker(0, 3, CHUNK_MAX_EXP, 2, 3).chunkify(BytesIO(b'foobarboobaz' * 3))), [b'foobarboobaz' * 3])
|
|
|
+ self.assert_equal(cf(Chunker(1, 3, CHUNK_MAX_EXP, 2, 3).chunkify(BytesIO(b'foobarboobaz' * 3))), [b'foobarbo', b'obazfoobar', b'boobazfo', b'obarboobaz'])
|
|
|
+ self.assert_equal(cf(Chunker(2, 3, CHUNK_MAX_EXP, 2, 3).chunkify(BytesIO(b'foobarboobaz' * 3))), [b'foobarboobaz', b'foobarboobaz', b'foobarboobaz'])
|
|
|
|
|
|
def test_buzhash(self):
|
|
|
self.assert_equal(buzhash(b'abcdefghijklmnop', 0), 3795437769)
|
|
@@ -106,5 +120,5 @@ class ChunkerTestCase(BaseTestCase):
|
|
|
return self.input[:1]
|
|
|
|
|
|
chunker = get_chunker(*CHUNKER_PARAMS, seed=0)
|
|
|
- reconstructed = b''.join(chunker.chunkify(SmallReadFile()))
|
|
|
+ reconstructed = b''.join(cf(chunker.chunkify(SmallReadFile())))
|
|
|
assert reconstructed == b'a' * 20
|