2
0
Эх сурвалжийг харах

Merge pull request #7620 from bigtedde/parameterize_test_compress

Parametrize compression tests
TW 2 жил өмнө
parent
commit
e70b5b1e94

+ 119 - 178
src/borg/testsuite/compress.py

@@ -2,138 +2,96 @@ import argparse
 import os
 import os
 import zlib
 import zlib
 
 
-try:
-    import lzma
-except ImportError:
-    lzma = None
-
 import pytest
 import pytest
 
 
 from ..compress import get_compressor, Compressor, CompressionSpec, CNONE, ZLIB, LZ4, LZMA, ZSTD, Auto
 from ..compress import get_compressor, Compressor, CompressionSpec, CNONE, ZLIB, LZ4, LZMA, ZSTD, Auto
 
 
-
-buffer = bytes(2**16)
-data = b"fooooooooobaaaaaaaar" * 10
+DATA = b"fooooooooobaaaaaaaar" * 10
 params = dict(name="zlib", level=6)
 params = dict(name="zlib", level=6)
 
 
 
 
-def test_get_compressor():
-    c = get_compressor(name="none")
-    assert isinstance(c, CNONE)
-    c = get_compressor(name="lz4")
-    assert isinstance(c, LZ4)
-    c = get_compressor(name="zlib")
-    assert isinstance(c, ZLIB)
-    with pytest.raises(KeyError):
-        get_compressor(name="foobar")
-
-
-def test_cnull():
-    c = get_compressor(name="none")
-    meta, cdata = c.compress({}, data)
-    assert len(cdata) >= len(data)
-    assert data in cdata  # it's not compressed and just in there 1:1
-    assert data == c.decompress(meta, cdata)[1]
-    assert data == Compressor(**params).decompress(meta, cdata)[1]  # autodetect
+@pytest.mark.parametrize(
+    "c_type, expected_compressor",
+    [("none", CNONE), ("lz4", LZ4), ("zlib", ZLIB), ("lzma", LZMA), ("zstd", ZSTD), ("foobar", None)],
+)
+def test_get_compressor(c_type, expected_compressor):
+    if expected_compressor is not None:
+        compressor = get_compressor(name=c_type)
+        assert isinstance(compressor, expected_compressor)
+    else:
+        with pytest.raises(KeyError):
+            get_compressor(name=c_type)
 
 
 
 
-def test_lz4():
-    c = get_compressor(name="lz4")
-    meta, cdata = c.compress({}, data)
-    assert len(cdata) < len(data)
-    assert data == c.decompress(meta, cdata)[1]
-    assert data == Compressor(**params).decompress(meta, cdata)[1]  # autodetect
+@pytest.mark.parametrize("c_type", ["none", "lz4", "zlib", "zstd", "lzma"])
+def test_compression_types(c_type):
+    c = get_compressor(name=c_type)
+    meta, cdata = c.compress({}, DATA)
+    if c_type == "none":
+        assert len(cdata) >= len(DATA)  # it's not compressed and just in there 1:1
+    else:
+        assert len(cdata) < len(DATA)
+    assert DATA == c.decompress(meta, cdata)[1]
+    assert DATA == Compressor(**params).decompress(meta, cdata)[1]  # autodetect
 
 
 
 
 def test_lz4_buffer_allocation(monkeypatch):
 def test_lz4_buffer_allocation(monkeypatch):
     # disable fallback to no compression on incompressible data
     # disable fallback to no compression on incompressible data
     monkeypatch.setattr(LZ4, "decide", lambda always_compress: LZ4)
     monkeypatch.setattr(LZ4, "decide", lambda always_compress: LZ4)
     # test with a rather huge data object to see if buffer allocation / resizing works
     # test with a rather huge data object to see if buffer allocation / resizing works
-    data = os.urandom(5 * 2**20) * 10  # 50MiB badly compressible data
-    assert len(data) == 50 * 2**20
+    incompressible_data = os.urandom(5 * 2**20) * 10  # 50MiB badly compressible data
     c = Compressor("lz4")
     c = Compressor("lz4")
-    meta, cdata = c.compress({}, data)
-    assert len(cdata) >= len(data)
-    assert data == c.decompress(meta, cdata)[1]
-
-
-def test_zlib():
-    c = get_compressor(name="zlib")
-    meta, cdata = c.compress({}, data)
-    assert len(cdata) < len(data)
-    assert data == c.decompress(meta, cdata)[1]
-    assert data == Compressor(**params).decompress(meta, cdata)[1]  # autodetect
-
-
-def test_lzma():
-    if lzma is None:
-        pytest.skip("No lzma support found.")
-    c = get_compressor(name="lzma")
-    meta, cdata = c.compress({}, data)
-    assert len(cdata) < len(data)
-    assert data == c.decompress(meta, cdata)[1]
-    assert data == Compressor(**params).decompress(meta, cdata)[1]  # autodetect
+    meta, cdata = c.compress({}, incompressible_data)
+    assert len(incompressible_data) == 50 * 2**20
+    assert len(cdata) >= len(incompressible_data)
+    assert incompressible_data == c.decompress(meta, cdata)[1]
 
 
 
 
-def test_zstd():
-    c = get_compressor(name="zstd")
-    meta, cdata = c.compress({}, data)
-    assert len(cdata) < len(data)
-    assert data == c.decompress(meta, cdata)[1]
-    assert data == Compressor(**params).decompress(meta, cdata)[1]  # autodetect
-
-
-def test_autodetect_invalid():
+@pytest.mark.parametrize("invalid_cdata", [b"\xff\xfftotalcrap", b"\x08\x00notreallyzlib"])
+def test_autodetect_invalid(invalid_cdata):
     with pytest.raises(ValueError):
     with pytest.raises(ValueError):
-        Compressor(**params, legacy_mode=True).decompress({}, b"\xff\xfftotalcrap")
-    with pytest.raises(ValueError):
-        Compressor(**params, legacy_mode=True).decompress({}, b"\x08\x00notreallyzlib")
+        Compressor(**params, legacy_mode=True).decompress({}, invalid_cdata)
 
 
 
 
 def test_zlib_legacy_compat():
 def test_zlib_legacy_compat():
     # for compatibility reasons, we do not add an extra header for zlib,
     # for compatibility reasons, we do not add an extra header for zlib,
-    # nor do we expect one when decompressing / autodetecting
+    # nor do we expect one when decompressing / auto-detecting
     for level in range(10):
     for level in range(10):
         c = get_compressor(name="zlib_legacy", level=level, legacy_mode=True)
         c = get_compressor(name="zlib_legacy", level=level, legacy_mode=True)
-        meta1, cdata1 = c.compress({}, data)
-        cdata2 = zlib.compress(data, level)
+        meta1, cdata1 = c.compress({}, DATA)
+        cdata2 = zlib.compress(DATA, level)
         assert cdata1 == cdata2
         assert cdata1 == cdata2
         meta2, data2 = c.decompress({}, cdata2)
         meta2, data2 = c.decompress({}, cdata2)
-        assert data == data2
-        # _, data2 = Compressor(**params).decompress({}, cdata2)
-        # assert data == data2
+        assert DATA == data2
 
 
 
 
-def test_compressor():
-    params_list = [
+@pytest.mark.parametrize(
+    "c_params",
+    [
         dict(name="none"),
         dict(name="none"),
         dict(name="lz4"),
         dict(name="lz4"),
         dict(name="zstd", level=1),
         dict(name="zstd", level=1),
-        dict(name="zstd", level=3),
-        # avoiding high zstd levels, memory needs unclear
+        dict(name="zstd", level=3),  # avoiding high zstd levels, memory needs unclear
         dict(name="zlib", level=0),
         dict(name="zlib", level=0),
         dict(name="zlib", level=6),
         dict(name="zlib", level=6),
         dict(name="zlib", level=9),
         dict(name="zlib", level=9),
-    ]
-    if lzma:
-        params_list += [
-            dict(name="lzma", level=0),
-            dict(name="lzma", level=6),
-            # we do not test lzma on level 9 because of the huge memory needs
-        ]
-    for params in params_list:
-        c = Compressor(**params)
-        meta_c, data_compressed = c.compress({}, data)
-        assert "ctype" in meta_c
-        assert "clevel" in meta_c
-        assert meta_c["csize"] == len(data_compressed)
-        assert meta_c["size"] == len(data)
-        meta_d, data_decompressed = c.decompress(meta_c, data_compressed)
-        assert data == data_decompressed
-        assert "ctype" in meta_d
-        assert "clevel" in meta_d
-        assert meta_d["csize"] == len(data_compressed)
-        assert meta_d["size"] == len(data)
+        dict(name="lzma", level=0),
+        dict(name="lzma", level=6),  # we do not test lzma on level 9 because of the huge memory needs
+    ],
+)
+def test_compressor(c_params):
+    c = Compressor(**c_params)
+    meta_c, data_compressed = c.compress({}, DATA)
+    assert "ctype" in meta_c
+    assert "clevel" in meta_c
+    assert meta_c["csize"] == len(data_compressed)
+    assert meta_c["size"] == len(DATA)
+    meta_d, data_decompressed = c.decompress(meta_c, data_compressed)
+    assert DATA == data_decompressed
+    assert "ctype" in meta_d
+    assert "clevel" in meta_d
+    assert meta_d["csize"] == len(data_compressed)
+    assert meta_d["size"] == len(DATA)
 
 
 
 
 def test_auto():
 def test_auto():
@@ -157,60 +115,60 @@ def test_auto():
     assert meta["csize"] == len(compressed)
     assert meta["csize"] == len(compressed)
 
 
 
 
-def test_obfuscate():
-    compressor = CompressionSpec("obfuscate,1,none").compressor
-    data = bytes(10000)
-    _, compressed = compressor.compress({}, data)
-    assert len(data) <= len(compressed) <= len(data) * 101
-    # compressing 100 times the same data should give at least 50 different result sizes
-    assert len({len(compressor.compress({}, data)[1]) for i in range(100)}) > 50
-
-    cs = CompressionSpec("obfuscate,2,lz4")
-    assert isinstance(cs.inner.compressor, LZ4)
+@pytest.mark.parametrize(
+    "specs, c_type, result_range, obfuscation_factor",
+    [
+        ("obfuscate,1,none", CNONE, 50, 10**1),
+        ("obfuscate,2,lz4", LZ4, 10, 10**2),
+        ("obfuscate,6,zstd,3", ZSTD, 90, 10**6),
+        ("obfuscate,2,auto,zstd,10", Auto, 10, 10**2),
+    ],
+)
+def test_factor_obfuscation(specs, c_type, result_range, obfuscation_factor: int):
+    # Testing relative random reciprocal size variation, obfuscation spec 1 to 6 inclusive
+    # obfuscate_factor = 10**(obfuscation spec)
+    cs = CompressionSpec(specs)
+    assert isinstance(cs.inner.compressor, c_type)
     compressor = cs.compressor
     compressor = cs.compressor
     data = bytes(10000)
     data = bytes(10000)
     _, compressed = compressor.compress({}, data)
     _, compressed = compressor.compress({}, data)
-    min_compress, max_compress = 0.2, 0.001  # estimate compression factor outer boundaries
-    assert max_compress * len(data) <= len(compressed) <= min_compress * len(data) * 1001
+    if c_type is CNONE:  # no compression
+        assert len(data) <= len(compressed) <= len(data) * (10 * obfuscation_factor) + 1
+    else:  # with compression
+        min_compress, max_compress = 0.2, 0.001  # estimate compression factor outer boundaries
+        assert max_compress * len(data) <= len(compressed) <= min_compress * len(data) * (10 * obfuscation_factor) + 1
+    assert len({len(compressor.compress({}, data)[1]) for i in range(100)}) > result_range
     # compressing 100 times the same data should give multiple different result sizes
     # compressing 100 times the same data should give multiple different result sizes
-    assert len({len(compressor.compress({}, data)[1]) for i in range(100)}) > 10
 
 
-    cs = CompressionSpec("obfuscate,6,zstd,3")
-    assert isinstance(cs.inner.compressor, ZSTD)
-    compressor = cs.compressor
-    data = bytes(10000)
-    _, compressed = compressor.compress({}, data)
-    min_compress, max_compress = 0.2, 0.001  # estimate compression factor outer boundaries
-    assert max_compress * len(data) <= len(compressed) <= min_compress * len(data) * 10000001
-    # compressing 100 times the same data should give multiple different result sizes
-    assert len({len(compressor.compress({}, data)[1]) for i in range(100)}) > 90
-
-    cs = CompressionSpec("obfuscate,2,auto,zstd,10")
-    assert isinstance(cs.inner.compressor, Auto)
-    compressor = cs.compressor
-    data = bytes(10000)
-    _, compressed = compressor.compress({}, data)
-    min_compress, max_compress = 0.2, 0.001  # estimate compression factor outer boundaries
-    assert max_compress * len(data) <= len(compressed) <= min_compress * len(data) * 1001
-    # compressing 100 times the same data should give multiple different result sizes
-    assert len({len(compressor.compress({}, data)[1]) for i in range(100)}) > 10
 
 
-    cs = CompressionSpec("obfuscate,110,none")
-    assert isinstance(cs.inner.compressor, CNONE)
+@pytest.mark.parametrize(
+    "specs, c_type, obfuscation_padding",
+    [
+        ("obfuscate,110,none", CNONE, 2**10),  # up to 1KiB padding
+        ("obfuscate,120,lz4", LZ4, 2**20),  # up to 1MiB padding
+        ("obfuscate,123,zstd,3", ZSTD, 2**23),  # max, up to 8MiB padding
+    ],
+)
+def test_additive_obfuscation(specs, c_type, obfuscation_padding: int):
+    # Testing randomly sized padding, obfuscation spec 110 to 123 inclusive
+    # obfuscate_padding = 2 ** (obfuscation spec - 100)
+    cs = CompressionSpec(specs)
+    assert isinstance(cs.inner.compressor, c_type)
     compressor = cs.compressor
     compressor = cs.compressor
-    data = bytes(1000)
-    _, compressed = compressor.compress({}, data)
-    assert 1000 <= len(compressed) <= 1000 + 1024
-    data = bytes(1100)
-    _, compressed = compressor.compress({}, data)
-    assert 1100 <= len(compressed) <= 1100 + 1024
+    data_list = (bytes(1000), bytes(1100))
+    for data in data_list:
+        _, compressed = compressor.compress({}, data)
+        if c_type is CNONE:  # no compression
+            assert len(data) <= len(compressed) <= len(data) + obfuscation_padding
+        else:  # with compression
+            min_compress, max_compress = 0.2, 0.001  # estimate compression factor outer boundaries
+            assert max_compress * len(data) <= len(compressed) <= min_compress * len(data) * obfuscation_padding
 
 
 
 
 def test_obfuscate_meta():
 def test_obfuscate_meta():
     compressor = CompressionSpec("obfuscate,3,lz4").compressor
     compressor = CompressionSpec("obfuscate,3,lz4").compressor
-    meta = {}
     data = bytes(10000)
     data = bytes(10000)
-    meta, compressed = compressor.compress(meta, data)
+    meta, compressed = compressor.compress({}, data)
     assert "ctype" in meta
     assert "ctype" in meta
     assert meta["ctype"] == LZ4.ID
     assert meta["ctype"] == LZ4.ID
     assert "clevel" in meta
     assert "clevel" in meta
@@ -221,51 +179,34 @@ def test_obfuscate_meta():
     assert "psize" in meta
     assert "psize" in meta
     psize = meta["psize"]
     psize = meta["psize"]
     assert 0 < psize < 100
     assert 0 < psize < 100
-    assert csize - psize >= 0  # there is a obfuscation trailer
+    assert csize - psize >= 0  # there is an obfuscation trailer
     trailer = compressed[psize:]
     trailer = compressed[psize:]
     assert not trailer or set(trailer) == {0}  # trailer is all-zero-bytes
     assert not trailer or set(trailer) == {0}  # trailer is all-zero-bytes
 
 
 
 
-def test_compression_specs():
-    with pytest.raises(argparse.ArgumentTypeError):
-        CompressionSpec("")
+@pytest.mark.parametrize(
+    "c_type, c_name", [(CNONE, "none"), (LZ4, "lz4"), (ZLIB, "zlib"), (LZMA, "lzma"), (ZSTD, "zstd")]
+)
+def test_default_compression_level(c_type, c_name):
+    cs = CompressionSpec(c_name).compressor
+    assert isinstance(cs, c_type)
+    if c_type in (ZLIB, LZMA):
+        assert cs.level == 6
+    elif c_type is ZSTD:
+        assert cs.level == 3
 
 
-    assert isinstance(CompressionSpec("none").compressor, CNONE)
-    assert isinstance(CompressionSpec("lz4").compressor, LZ4)
 
 
-    zlib = CompressionSpec("zlib").compressor
-    assert isinstance(zlib, ZLIB)
-    assert zlib.level == 6
-    zlib = CompressionSpec("zlib,0").compressor
-    assert isinstance(zlib, ZLIB)
-    assert zlib.level == 0
-    zlib = CompressionSpec("zlib,9").compressor
-    assert isinstance(zlib, ZLIB)
-    assert zlib.level == 9
-    with pytest.raises(argparse.ArgumentTypeError):
-        CompressionSpec("zlib,9,invalid")
-
-    lzma = CompressionSpec("lzma").compressor
-    assert isinstance(lzma, LZMA)
-    assert lzma.level == 6
-    lzma = CompressionSpec("lzma,0").compressor
-    assert isinstance(lzma, LZMA)
-    assert lzma.level == 0
-    lzma = CompressionSpec("lzma,9").compressor
-    assert isinstance(lzma, LZMA)
-    assert lzma.level == 9
+@pytest.mark.parametrize(
+    "c_type, c_name, c_levels", [(ZLIB, "zlib", [0, 9]), (LZMA, "lzma", [0, 9]), (ZSTD, "zstd", [1, 22])]
+)
+def test_specified_compression_level(c_type, c_name, c_levels):
+    for level in c_levels:
+        cs = CompressionSpec(f"{c_name},{level}").compressor
+        assert isinstance(cs, c_type)
+        assert cs.level == level
 
 
-    zstd = CompressionSpec("zstd").compressor
-    assert isinstance(zstd, ZSTD)
-    assert zstd.level == 3
-    zstd = CompressionSpec("zstd,1").compressor
-    assert isinstance(zstd, ZSTD)
-    assert zstd.level == 1
-    zstd = CompressionSpec("zstd,22").compressor
-    assert isinstance(zstd, ZSTD)
-    assert zstd.level == 22
 
 
+@pytest.mark.parametrize("invalid_spec", ["", "lzma,9,invalid", "invalid"])
+def test_invalid_compression_level(invalid_spec):
     with pytest.raises(argparse.ArgumentTypeError):
     with pytest.raises(argparse.ArgumentTypeError):
-        CompressionSpec("lzma,9,invalid")
-    with pytest.raises(argparse.ArgumentTypeError):
-        CompressionSpec("invalid")
+        CompressionSpec(invalid_spec)