Browse Source

chunkers: rename test modules

Thomas Waldmann 1 tuần trước cách đây
mục cha
commit
3c459f0ae7

+ 1 - 1
src/borg/selftest.py

@@ -22,7 +22,7 @@ import time
 from unittest import TestResult, TestSuite, defaultTestLoader
 
 from .testsuite.crypto_test import CryptoTestCase
-from .testsuite.chunkers.buzhash_test import ChunkerTestCase
+from .testsuite.chunkers.buzhash_self_test import ChunkerTestCase
 
 SELFTEST_CASES = [CryptoTestCase, ChunkerTestCase]
 

+ 0 - 69
src/borg/testsuite/chunkers/buzhash_pytest_test.py

@@ -1,69 +0,0 @@
-from hashlib import sha256
-from io import BytesIO
-import os
-
-from . import cf
-from ...chunkers import Chunker
-from ...constants import *  # NOQA
-from ...helpers import hex_to_bin
-
-
-def H(data):
-    return sha256(data).digest()
-
-
-def test_chunkpoints_unchanged():
-    def twist(size):
-        x = 1
-        a = bytearray(size)
-        for i in range(size):
-            x = (x * 1103515245 + 12345) & 0x7FFFFFFF
-            a[i] = x & 0xFF
-        return a
-
-    data = twist(100000)
-
-    runs = []
-    for winsize in (65, 129, HASH_WINDOW_SIZE, 7351):
-        for minexp in (4, 6, 7, 11, 12):
-            for maxexp in (15, 17):
-                if minexp >= maxexp:
-                    continue
-                for maskbits in (4, 7, 10, 12):
-                    for seed in (1849058162, 1234567653):
-                        fh = BytesIO(data)
-                        chunker = Chunker(seed, minexp, maxexp, maskbits, winsize)
-                        chunks = [H(c) for c in cf(chunker.chunkify(fh, -1))]
-                        runs.append(H(b"".join(chunks)))
-
-    # The "correct" hash below matches the existing chunker behavior.
-    # Future chunker optimisations must not change this, or existing repos will bloat.
-    overall_hash = H(b"".join(runs))
-    assert overall_hash == hex_to_bin("a43d0ecb3ae24f38852fcc433a83dacd28fe0748d09cc73fc11b69cf3f1a7299")
-
-
-def test_buzhash_chunksize_distribution():
-    data = os.urandom(1048576)
-    min_exp, max_exp, mask = 10, 16, 14  # chunk size target 16kiB, clip at 1kiB and 64kiB
-    chunker = Chunker(0, min_exp, max_exp, mask, 4095)
-    f = BytesIO(data)
-    chunks = cf(chunker.chunkify(f))
-    del chunks[-1]  # get rid of the last chunk, it can be smaller than 2**min_exp
-    chunk_sizes = [len(chunk) for chunk in chunks]
-    chunks_count = len(chunks)
-    min_chunksize_observed = min(chunk_sizes)
-    max_chunksize_observed = max(chunk_sizes)
-    min_count = sum(int(size == 2**min_exp) for size in chunk_sizes)
-    max_count = sum(int(size == 2**max_exp) for size in chunk_sizes)
-    print(
-        f"count: {chunks_count} min: {min_chunksize_observed} max: {max_chunksize_observed} "
-        f"min count: {min_count} max count: {max_count}"
-    )
-    # usually there will about 64 chunks
-    assert 32 < chunks_count < 128
-    # chunks always must be between min and max (clipping must work):
-    assert min_chunksize_observed >= 2**min_exp
-    assert max_chunksize_observed <= 2**max_exp
-    # most chunks should be cut due to buzhash triggering, not due to clipping at min/max size:
-    assert min_count < 10
-    assert max_count < 10

+ 74 - 0
src/borg/testsuite/chunkers/buzhash_self_test.py

@@ -0,0 +1,74 @@
+# Note: these tests are part of the self test, do not use or import pytest functionality here.
+#       See borg.selftest for details. If you add/remove test methods, update SELFTEST_COUNT
+
+from io import BytesIO
+
+from ...chunkers import get_chunker
+from ...chunkers.buzhash import buzhash, buzhash_update, Chunker
+from ...constants import *  # NOQA
+from .. import BaseTestCase
+from . import cf
+
+
+class ChunkerTestCase(BaseTestCase):
+    def test_chunkify(self):
+        data = b"0" * int(1.5 * (1 << CHUNK_MAX_EXP)) + b"Y"
+        parts = cf(Chunker(0, 1, CHUNK_MAX_EXP, 2, 2).chunkify(BytesIO(data)))
+        self.assert_equal(len(parts), 2)
+        self.assert_equal(b"".join(parts), data)
+        self.assert_equal(cf(Chunker(0, 1, CHUNK_MAX_EXP, 2, 2).chunkify(BytesIO(b""))), [])
+        self.assert_equal(
+            cf(Chunker(0, 1, CHUNK_MAX_EXP, 2, 2).chunkify(BytesIO(b"foobarboobaz" * 3))),
+            [b"fooba", b"rboobaz", b"fooba", b"rboobaz", b"fooba", b"rboobaz"],
+        )
+        self.assert_equal(
+            cf(Chunker(1, 1, CHUNK_MAX_EXP, 2, 2).chunkify(BytesIO(b"foobarboobaz" * 3))),
+            [b"fo", b"obarb", b"oob", b"azf", b"oobarb", b"oob", b"azf", b"oobarb", b"oobaz"],
+        )
+        self.assert_equal(
+            cf(Chunker(2, 1, CHUNK_MAX_EXP, 2, 2).chunkify(BytesIO(b"foobarboobaz" * 3))),
+            [b"foob", b"ar", b"boobazfoob", b"ar", b"boobazfoob", b"ar", b"boobaz"],
+        )
+        self.assert_equal(
+            cf(Chunker(0, 2, CHUNK_MAX_EXP, 2, 3).chunkify(BytesIO(b"foobarboobaz" * 3))), [b"foobarboobaz" * 3]
+        )
+        self.assert_equal(
+            cf(Chunker(1, 2, CHUNK_MAX_EXP, 2, 3).chunkify(BytesIO(b"foobarboobaz" * 3))),
+            [b"foobar", b"boobazfo", b"obar", b"boobazfo", b"obar", b"boobaz"],
+        )
+        self.assert_equal(
+            cf(Chunker(2, 2, CHUNK_MAX_EXP, 2, 3).chunkify(BytesIO(b"foobarboobaz" * 3))),
+            [b"foob", b"arboobaz", b"foob", b"arboobaz", b"foob", b"arboobaz"],
+        )
+        self.assert_equal(
+            cf(Chunker(0, 3, CHUNK_MAX_EXP, 2, 3).chunkify(BytesIO(b"foobarboobaz" * 3))), [b"foobarboobaz" * 3]
+        )
+        self.assert_equal(
+            cf(Chunker(1, 3, CHUNK_MAX_EXP, 2, 3).chunkify(BytesIO(b"foobarboobaz" * 3))),
+            [b"foobarbo", b"obazfoobar", b"boobazfo", b"obarboobaz"],
+        )
+        self.assert_equal(
+            cf(Chunker(2, 3, CHUNK_MAX_EXP, 2, 3).chunkify(BytesIO(b"foobarboobaz" * 3))),
+            [b"foobarboobaz", b"foobarboobaz", b"foobarboobaz"],
+        )
+
+    def test_buzhash(self):
+        self.assert_equal(buzhash(b"abcdefghijklmnop", 0), 3795437769)
+        self.assert_equal(buzhash(b"abcdefghijklmnop", 1), 3795400502)
+        self.assert_equal(
+            buzhash(b"abcdefghijklmnop", 1), buzhash_update(buzhash(b"Xabcdefghijklmno", 1), ord("X"), ord("p"), 16, 1)
+        )
+        # Test with more than 31 bytes to make sure our barrel_shift macro works correctly
+        self.assert_equal(buzhash(b"abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz", 0), 566521248)
+
+    def test_small_reads(self):
+        class SmallReadFile:
+            input = b"a" * (20 + 1)
+
+            def read(self, nbytes):
+                self.input = self.input[:-1]
+                return self.input[:1]
+
+        chunker = get_chunker(*CHUNKER_PARAMS, seed=0, sparse=False)
+        reconstructed = b"".join(cf(chunker.chunkify(SmallReadFile())))
+        assert reconstructed == b"a" * 20

+ 60 - 65
src/borg/testsuite/chunkers/buzhash_test.py

@@ -1,74 +1,69 @@
-# Note: these tests are part of the self test, do not use or import pytest functionality here.
-#       See borg.selftest for details. If you add/remove test methods, update SELFTEST_COUNT
-
+from hashlib import sha256
 from io import BytesIO
+import os
 
-from ...chunkers import get_chunker
-from ...chunkers.buzhash import buzhash, buzhash_update, Chunker
-from ...constants import *  # NOQA
-from .. import BaseTestCase
 from . import cf
+from ...chunkers import Chunker
+from ...constants import *  # NOQA
+from ...helpers import hex_to_bin
+
+
+def H(data):
+    return sha256(data).digest()
+
 
+def test_chunkpoints_unchanged():
+    def twist(size):
+        x = 1
+        a = bytearray(size)
+        for i in range(size):
+            x = (x * 1103515245 + 12345) & 0x7FFFFFFF
+            a[i] = x & 0xFF
+        return a
 
-class ChunkerTestCase(BaseTestCase):
-    def test_chunkify(self):
-        data = b"0" * int(1.5 * (1 << CHUNK_MAX_EXP)) + b"Y"
-        parts = cf(Chunker(0, 1, CHUNK_MAX_EXP, 2, 2).chunkify(BytesIO(data)))
-        self.assert_equal(len(parts), 2)
-        self.assert_equal(b"".join(parts), data)
-        self.assert_equal(cf(Chunker(0, 1, CHUNK_MAX_EXP, 2, 2).chunkify(BytesIO(b""))), [])
-        self.assert_equal(
-            cf(Chunker(0, 1, CHUNK_MAX_EXP, 2, 2).chunkify(BytesIO(b"foobarboobaz" * 3))),
-            [b"fooba", b"rboobaz", b"fooba", b"rboobaz", b"fooba", b"rboobaz"],
-        )
-        self.assert_equal(
-            cf(Chunker(1, 1, CHUNK_MAX_EXP, 2, 2).chunkify(BytesIO(b"foobarboobaz" * 3))),
-            [b"fo", b"obarb", b"oob", b"azf", b"oobarb", b"oob", b"azf", b"oobarb", b"oobaz"],
-        )
-        self.assert_equal(
-            cf(Chunker(2, 1, CHUNK_MAX_EXP, 2, 2).chunkify(BytesIO(b"foobarboobaz" * 3))),
-            [b"foob", b"ar", b"boobazfoob", b"ar", b"boobazfoob", b"ar", b"boobaz"],
-        )
-        self.assert_equal(
-            cf(Chunker(0, 2, CHUNK_MAX_EXP, 2, 3).chunkify(BytesIO(b"foobarboobaz" * 3))), [b"foobarboobaz" * 3]
-        )
-        self.assert_equal(
-            cf(Chunker(1, 2, CHUNK_MAX_EXP, 2, 3).chunkify(BytesIO(b"foobarboobaz" * 3))),
-            [b"foobar", b"boobazfo", b"obar", b"boobazfo", b"obar", b"boobaz"],
-        )
-        self.assert_equal(
-            cf(Chunker(2, 2, CHUNK_MAX_EXP, 2, 3).chunkify(BytesIO(b"foobarboobaz" * 3))),
-            [b"foob", b"arboobaz", b"foob", b"arboobaz", b"foob", b"arboobaz"],
-        )
-        self.assert_equal(
-            cf(Chunker(0, 3, CHUNK_MAX_EXP, 2, 3).chunkify(BytesIO(b"foobarboobaz" * 3))), [b"foobarboobaz" * 3]
-        )
-        self.assert_equal(
-            cf(Chunker(1, 3, CHUNK_MAX_EXP, 2, 3).chunkify(BytesIO(b"foobarboobaz" * 3))),
-            [b"foobarbo", b"obazfoobar", b"boobazfo", b"obarboobaz"],
-        )
-        self.assert_equal(
-            cf(Chunker(2, 3, CHUNK_MAX_EXP, 2, 3).chunkify(BytesIO(b"foobarboobaz" * 3))),
-            [b"foobarboobaz", b"foobarboobaz", b"foobarboobaz"],
-        )
+    data = twist(100000)
 
-    def test_buzhash(self):
-        self.assert_equal(buzhash(b"abcdefghijklmnop", 0), 3795437769)
-        self.assert_equal(buzhash(b"abcdefghijklmnop", 1), 3795400502)
-        self.assert_equal(
-            buzhash(b"abcdefghijklmnop", 1), buzhash_update(buzhash(b"Xabcdefghijklmno", 1), ord("X"), ord("p"), 16, 1)
-        )
-        # Test with more than 31 bytes to make sure our barrel_shift macro works correctly
-        self.assert_equal(buzhash(b"abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz", 0), 566521248)
+    runs = []
+    for winsize in (65, 129, HASH_WINDOW_SIZE, 7351):
+        for minexp in (4, 6, 7, 11, 12):
+            for maxexp in (15, 17):
+                if minexp >= maxexp:
+                    continue
+                for maskbits in (4, 7, 10, 12):
+                    for seed in (1849058162, 1234567653):
+                        fh = BytesIO(data)
+                        chunker = Chunker(seed, minexp, maxexp, maskbits, winsize)
+                        chunks = [H(c) for c in cf(chunker.chunkify(fh, -1))]
+                        runs.append(H(b"".join(chunks)))
 
-    def test_small_reads(self):
-        class SmallReadFile:
-            input = b"a" * (20 + 1)
+    # The "correct" hash below matches the existing chunker behavior.
+    # Future chunker optimisations must not change this, or existing repos will bloat.
+    overall_hash = H(b"".join(runs))
+    assert overall_hash == hex_to_bin("a43d0ecb3ae24f38852fcc433a83dacd28fe0748d09cc73fc11b69cf3f1a7299")
 
-            def read(self, nbytes):
-                self.input = self.input[:-1]
-                return self.input[:1]
 
-        chunker = get_chunker(*CHUNKER_PARAMS, seed=0, sparse=False)
-        reconstructed = b"".join(cf(chunker.chunkify(SmallReadFile())))
-        assert reconstructed == b"a" * 20
+def test_buzhash_chunksize_distribution():
+    data = os.urandom(1048576)
+    min_exp, max_exp, mask = 10, 16, 14  # chunk size target 16kiB, clip at 1kiB and 64kiB
+    chunker = Chunker(0, min_exp, max_exp, mask, 4095)
+    f = BytesIO(data)
+    chunks = cf(chunker.chunkify(f))
+    del chunks[-1]  # get rid of the last chunk, it can be smaller than 2**min_exp
+    chunk_sizes = [len(chunk) for chunk in chunks]
+    chunks_count = len(chunks)
+    min_chunksize_observed = min(chunk_sizes)
+    max_chunksize_observed = max(chunk_sizes)
+    min_count = sum(int(size == 2**min_exp) for size in chunk_sizes)
+    max_count = sum(int(size == 2**max_exp) for size in chunk_sizes)
+    print(
+        f"count: {chunks_count} min: {min_chunksize_observed} max: {max_chunksize_observed} "
+        f"min count: {min_count} max count: {max_count}"
+    )
+    # usually there will about 64 chunks
+    assert 32 < chunks_count < 128
+    # chunks always must be between min and max (clipping must work):
+    assert min_chunksize_observed >= 2**min_exp
+    assert max_chunksize_observed <= 2**max_exp
+    # most chunks should be cut due to buzhash triggering, not due to clipping at min/max size:
+    assert min_count < 10
+    assert max_count < 10

+ 0 - 0
src/borg/testsuite/chunkers/failing_pytest_test.py → src/borg/testsuite/chunkers/failing_test.py


+ 0 - 39
src/borg/testsuite/chunkers/fixed_pytest_test.py

@@ -1,39 +0,0 @@
-import pytest
-
-from . import cf, make_sparsefile, make_content, fs_supports_sparse
-from . import BS, map_sparse1, map_sparse2, map_onlysparse, map_notsparse
-from ...chunkers import ChunkerFixed
-from ...constants import *  # NOQA
-
-
-@pytest.mark.skipif(not fs_supports_sparse(), reason="fs does not support sparse files")
-@pytest.mark.parametrize(
-    "fname, sparse_map, header_size, sparse",
-    [
-        ("sparse1", map_sparse1, 0, False),
-        ("sparse1", map_sparse1, 0, True),
-        ("sparse1", map_sparse1, BS, False),
-        ("sparse1", map_sparse1, BS, True),
-        ("sparse2", map_sparse2, 0, False),
-        ("sparse2", map_sparse2, 0, True),
-        ("sparse2", map_sparse2, BS, False),
-        ("sparse2", map_sparse2, BS, True),
-        ("onlysparse", map_onlysparse, 0, False),
-        ("onlysparse", map_onlysparse, 0, True),
-        ("onlysparse", map_onlysparse, BS, False),
-        ("onlysparse", map_onlysparse, BS, True),
-        ("notsparse", map_notsparse, 0, False),
-        ("notsparse", map_notsparse, 0, True),
-        ("notsparse", map_notsparse, BS, False),
-        ("notsparse", map_notsparse, BS, True),
-    ],
-)
-def test_chunkify_sparse(tmpdir, fname, sparse_map, header_size, sparse):
-    def get_chunks(fname, sparse, header_size):
-        chunker = ChunkerFixed(4096, header_size=header_size, sparse=sparse)
-        with open(fname, "rb") as fd:
-            return cf(chunker.chunkify(fd))
-
-    fn = str(tmpdir / fname)
-    make_sparsefile(fn, sparse_map, header_size=header_size)
-    get_chunks(fn, sparse=sparse, header_size=header_size) == make_content(sparse_map, header_size=header_size)

+ 35 - 55
src/borg/testsuite/chunkers/fixed_test.py

@@ -1,59 +1,39 @@
-from io import BytesIO
+import pytest
 
-from ...chunkers.fixed import ChunkerFixed
+from . import cf, make_sparsefile, make_content, fs_supports_sparse
+from . import BS, map_sparse1, map_sparse2, map_onlysparse, map_notsparse
+from ...chunkers import ChunkerFixed
 from ...constants import *  # NOQA
-from .. import BaseTestCase
-from . import cf
 
 
-class ChunkerFixedTestCase(BaseTestCase):
-    def test_chunkify_just_blocks(self):
-        data = b"foobar" * 1500
-        chunker = ChunkerFixed(4096)
-        parts = cf(chunker.chunkify(BytesIO(data)))
-        self.assert_equal(parts, [data[0:4096], data[4096:8192], data[8192:]])
-
-    def test_chunkify_header_and_blocks(self):
-        data = b"foobar" * 1500
-        chunker = ChunkerFixed(4096, 123)
-        parts = cf(chunker.chunkify(BytesIO(data)))
-        self.assert_equal(
-            parts, [data[0:123], data[123 : 123 + 4096], data[123 + 4096 : 123 + 8192], data[123 + 8192 :]]
-        )
-
-    def test_chunkify_just_blocks_fmap_complete(self):
-        data = b"foobar" * 1500
-        chunker = ChunkerFixed(4096)
-        fmap = [(0, 4096, True), (4096, 8192, True), (8192, 99999999, True)]
-        parts = cf(chunker.chunkify(BytesIO(data), fmap=fmap))
-        self.assert_equal(parts, [data[0:4096], data[4096:8192], data[8192:]])
-
-    def test_chunkify_header_and_blocks_fmap_complete(self):
-        data = b"foobar" * 1500
-        chunker = ChunkerFixed(4096, 123)
-        fmap = [(0, 123, True), (123, 4096, True), (123 + 4096, 4096, True), (123 + 8192, 4096, True)]
-        parts = cf(chunker.chunkify(BytesIO(data), fmap=fmap))
-        self.assert_equal(
-            parts, [data[0:123], data[123 : 123 + 4096], data[123 + 4096 : 123 + 8192], data[123 + 8192 :]]
-        )
-
-    def test_chunkify_header_and_blocks_fmap_zeros(self):
-        data = b"H" * 123 + b"_" * 4096 + b"X" * 4096 + b"_" * 4096
-        chunker = ChunkerFixed(4096, 123)
-        fmap = [(0, 123, True), (123, 4096, False), (123 + 4096, 4096, True), (123 + 8192, 4096, False)]
-        parts = cf(chunker.chunkify(BytesIO(data), fmap=fmap))
-        # because we marked the '_' ranges as holes, we will get hole ranges instead!
-        self.assert_equal(parts, [data[0:123], 4096, data[123 + 4096 : 123 + 8192], 4096])
-
-    def test_chunkify_header_and_blocks_fmap_partial(self):
-        data = b"H" * 123 + b"_" * 4096 + b"X" * 4096 + b"_" * 4096
-        chunker = ChunkerFixed(4096, 123)
-        fmap = [
-            (0, 123, True),
-            # (123, 4096, False),
-            (123 + 4096, 4096, True),
-            # (123+8192, 4096, False),
-        ]
-        parts = cf(chunker.chunkify(BytesIO(data), fmap=fmap))
-        # because we left out the '_' ranges from the fmap, we will not get them at all!
-        self.assert_equal(parts, [data[0:123], data[123 + 4096 : 123 + 8192]])
+@pytest.mark.skipif(not fs_supports_sparse(), reason="fs does not support sparse files")
+@pytest.mark.parametrize(
+    "fname, sparse_map, header_size, sparse",
+    [
+        ("sparse1", map_sparse1, 0, False),
+        ("sparse1", map_sparse1, 0, True),
+        ("sparse1", map_sparse1, BS, False),
+        ("sparse1", map_sparse1, BS, True),
+        ("sparse2", map_sparse2, 0, False),
+        ("sparse2", map_sparse2, 0, True),
+        ("sparse2", map_sparse2, BS, False),
+        ("sparse2", map_sparse2, BS, True),
+        ("onlysparse", map_onlysparse, 0, False),
+        ("onlysparse", map_onlysparse, 0, True),
+        ("onlysparse", map_onlysparse, BS, False),
+        ("onlysparse", map_onlysparse, BS, True),
+        ("notsparse", map_notsparse, 0, False),
+        ("notsparse", map_notsparse, 0, True),
+        ("notsparse", map_notsparse, BS, False),
+        ("notsparse", map_notsparse, BS, True),
+    ],
+)
+def test_chunkify_sparse(tmpdir, fname, sparse_map, header_size, sparse):
+    def get_chunks(fname, sparse, header_size):
+        chunker = ChunkerFixed(4096, header_size=header_size, sparse=sparse)
+        with open(fname, "rb") as fd:
+            return cf(chunker.chunkify(fd))
+
+    fn = str(tmpdir / fname)
+    make_sparsefile(fn, sparse_map, header_size=header_size)
+    get_chunks(fn, sparse=sparse, header_size=header_size) == make_content(sparse_map, header_size=header_size)

+ 59 - 0
src/borg/testsuite/chunkers/fixed_ut_test.py

@@ -0,0 +1,59 @@
+from io import BytesIO
+
+from ...chunkers.fixed import ChunkerFixed
+from ...constants import *  # NOQA
+from .. import BaseTestCase
+from . import cf
+
+
+class ChunkerFixedTestCase(BaseTestCase):
+    def test_chunkify_just_blocks(self):
+        data = b"foobar" * 1500
+        chunker = ChunkerFixed(4096)
+        parts = cf(chunker.chunkify(BytesIO(data)))
+        self.assert_equal(parts, [data[0:4096], data[4096:8192], data[8192:]])
+
+    def test_chunkify_header_and_blocks(self):
+        data = b"foobar" * 1500
+        chunker = ChunkerFixed(4096, 123)
+        parts = cf(chunker.chunkify(BytesIO(data)))
+        self.assert_equal(
+            parts, [data[0:123], data[123 : 123 + 4096], data[123 + 4096 : 123 + 8192], data[123 + 8192 :]]
+        )
+
+    def test_chunkify_just_blocks_fmap_complete(self):
+        data = b"foobar" * 1500
+        chunker = ChunkerFixed(4096)
+        fmap = [(0, 4096, True), (4096, 8192, True), (8192, 99999999, True)]
+        parts = cf(chunker.chunkify(BytesIO(data), fmap=fmap))
+        self.assert_equal(parts, [data[0:4096], data[4096:8192], data[8192:]])
+
+    def test_chunkify_header_and_blocks_fmap_complete(self):
+        data = b"foobar" * 1500
+        chunker = ChunkerFixed(4096, 123)
+        fmap = [(0, 123, True), (123, 4096, True), (123 + 4096, 4096, True), (123 + 8192, 4096, True)]
+        parts = cf(chunker.chunkify(BytesIO(data), fmap=fmap))
+        self.assert_equal(
+            parts, [data[0:123], data[123 : 123 + 4096], data[123 + 4096 : 123 + 8192], data[123 + 8192 :]]
+        )
+
+    def test_chunkify_header_and_blocks_fmap_zeros(self):
+        data = b"H" * 123 + b"_" * 4096 + b"X" * 4096 + b"_" * 4096
+        chunker = ChunkerFixed(4096, 123)
+        fmap = [(0, 123, True), (123, 4096, False), (123 + 4096, 4096, True), (123 + 8192, 4096, False)]
+        parts = cf(chunker.chunkify(BytesIO(data), fmap=fmap))
+        # because we marked the '_' ranges as holes, we will get hole ranges instead!
+        self.assert_equal(parts, [data[0:123], 4096, data[123 + 4096 : 123 + 8192], 4096])
+
+    def test_chunkify_header_and_blocks_fmap_partial(self):
+        data = b"H" * 123 + b"_" * 4096 + b"X" * 4096 + b"_" * 4096
+        chunker = ChunkerFixed(4096, 123)
+        fmap = [
+            (0, 123, True),
+            # (123, 4096, False),
+            (123 + 4096, 4096, True),
+            # (123+8192, 4096, False),
+        ]
+        parts = cf(chunker.chunkify(BytesIO(data), fmap=fmap))
+        # because we left out the '_' ranges from the fmap, we will not get them at all!
+        self.assert_equal(parts, [data[0:123], data[123 + 4096 : 123 + 8192]])

+ 0 - 0
src/borg/testsuite/chunkers/reader_pytest_test.py → src/borg/testsuite/chunkers/reader_test.py