|
@@ -5,6 +5,7 @@ import zlib
|
|
|
import pytest
|
|
|
|
|
|
from ..compress import get_compressor, Compressor, CompressionSpec, CNONE, ZLIB, LZ4, LZMA, ZSTD, Auto
|
|
|
+from ..constants import ROBJ_FILE_STREAM, ROBJ_ARCHIVE_META
|
|
|
|
|
|
DATA = b"fooooooooobaaaaaaaar" * 10
|
|
|
params = dict(name="zlib", level=6)
|
|
@@ -131,13 +132,13 @@ def test_factor_obfuscation(specs, c_type, result_range, obfuscation_factor: int
|
|
|
assert isinstance(cs.inner.compressor, c_type)
|
|
|
compressor = cs.compressor
|
|
|
data = bytes(10000)
|
|
|
- _, compressed = compressor.compress({}, data)
|
|
|
+ _, compressed = compressor.compress(dict(type=ROBJ_FILE_STREAM), data)
|
|
|
if c_type is CNONE: # no compression
|
|
|
assert len(data) <= len(compressed) <= len(data) * (10 * obfuscation_factor) + 1
|
|
|
else: # with compression
|
|
|
min_compress, max_compress = 0.2, 0.001 # estimate compression factor outer boundaries
|
|
|
assert max_compress * len(data) <= len(compressed) <= min_compress * len(data) * (10 * obfuscation_factor) + 1
|
|
|
- assert len({len(compressor.compress({}, data)[1]) for i in range(100)}) > result_range
|
|
|
+ assert len({len(compressor.compress(dict(type=ROBJ_FILE_STREAM), data)[1]) for i in range(100)}) > result_range
|
|
|
# compressing 100 times the same data should give multiple different result sizes
|
|
|
|
|
|
|
|
@@ -157,7 +158,7 @@ def test_additive_obfuscation(specs, c_type, obfuscation_padding: int):
|
|
|
compressor = cs.compressor
|
|
|
data_list = (bytes(1000), bytes(1100))
|
|
|
for data in data_list:
|
|
|
- _, compressed = compressor.compress({}, data)
|
|
|
+ _, compressed = compressor.compress(dict(type=ROBJ_FILE_STREAM), data)
|
|
|
if c_type is CNONE: # no compression
|
|
|
assert len(data) <= len(compressed) <= len(data) + obfuscation_padding
|
|
|
else: # with compression
|
|
@@ -168,7 +169,7 @@ def test_additive_obfuscation(specs, c_type, obfuscation_padding: int):
|
|
|
def test_obfuscate_meta():
|
|
|
compressor = CompressionSpec("obfuscate,3,lz4").compressor
|
|
|
data = bytes(10000)
|
|
|
- meta, compressed = compressor.compress({}, data)
|
|
|
+ meta, compressed = compressor.compress(dict(type=ROBJ_FILE_STREAM), data)
|
|
|
assert "ctype" in meta
|
|
|
assert meta["ctype"] == LZ4.ID
|
|
|
assert "clevel" in meta
|
|
@@ -233,7 +234,26 @@ def test_invalid_compression_level(invalid_spec):
|
|
|
def test_padme_obfuscation(data_length, expected_padding):
|
|
|
compressor = Compressor(name="obfuscate", level=250, compressor=Compressor("none"))
|
|
|
data = b"x" * data_length
|
|
|
- meta, compressed = compressor.compress({}, data)
|
|
|
+ meta, compressed = compressor.compress(dict(type=ROBJ_FILE_STREAM), data)
|
|
|
+
|
|
|
+ expected_padded_size = data_length + expected_padding
|
|
|
+
|
|
|
+ assert (
|
|
|
+ len(compressed) == expected_padded_size
|
|
|
+ ), f"For {data_length}, expected {expected_padded_size}, got {len(compressed)}"
|
|
|
+
|
|
|
+
|
|
|
+@pytest.mark.parametrize(
|
|
|
+ "data_length, expected_padding, robj_type",
|
|
|
+ [
|
|
|
+ (1000000, 15808, ROBJ_FILE_STREAM), # we want to obfuscate file content chunk sizes
|
|
|
+ (1000000, 0, ROBJ_ARCHIVE_META), # we do not want to obfuscate metadata chunk sizes
|
|
|
+ ],
|
|
|
+)
|
|
|
+def test_robj_specific_obfuscation(data_length, expected_padding, robj_type):
|
|
|
+ compressor = Compressor(name="obfuscate", level=250, compressor=Compressor("none"))
|
|
|
+ data = b"x" * data_length
|
|
|
+ meta, compressed = compressor.compress(dict(type=robj_type), data)
|
|
|
|
|
|
expected_padded_size = data_length + expected_padding
|
|
|
|