123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240 |
- import os
- import zlib
- try:
- import lzma
- except ImportError:
- lzma = None
- import pytest
- from ..compress import get_compressor, Compressor, CompressionSpec, CNONE, ZLIB, LZ4, LZMA, ZSTD, Auto
- buffer = bytes(2**16)
- data = b'fooooooooobaaaaaaaar' * 10
- params = dict(name='zlib', level=6)
- def test_get_compressor():
- c = get_compressor(name='none')
- assert isinstance(c, CNONE)
- c = get_compressor(name='lz4')
- assert isinstance(c, LZ4)
- c = get_compressor(name='zlib')
- assert isinstance(c, ZLIB)
- with pytest.raises(KeyError):
- get_compressor(name='foobar')
- def test_cnull():
- c = get_compressor(name='none')
- cdata = c.compress(data)
- assert len(cdata) > len(data)
- assert data in cdata # it's not compressed and just in there 1:1
- assert data == c.decompress(cdata)
- assert data == Compressor(**params).decompress(cdata) # autodetect
- def test_lz4():
- c = get_compressor(name='lz4')
- cdata = c.compress(data)
- assert len(cdata) < len(data)
- assert data == c.decompress(cdata)
- assert data == Compressor(**params).decompress(cdata) # autodetect
- def test_lz4_buffer_allocation(monkeypatch):
- # disable fallback to no compression on incompressible data
- monkeypatch.setattr(LZ4, 'decide', lambda always_compress: LZ4)
- # test with a rather huge data object to see if buffer allocation / resizing works
- data = os.urandom(5 * 2**20) * 10 # 50MiB badly compressible data
- assert len(data) == 50 * 2**20
- c = Compressor('lz4')
- cdata = c.compress(data)
- assert len(cdata) > len(data)
- assert data == c.decompress(cdata)
- def test_zlib():
- c = get_compressor(name='zlib')
- cdata = c.compress(data)
- assert len(cdata) < len(data)
- assert data == c.decompress(cdata)
- assert data == Compressor(**params).decompress(cdata) # autodetect
- def test_lzma():
- if lzma is None:
- pytest.skip("No lzma support found.")
- c = get_compressor(name='lzma')
- cdata = c.compress(data)
- assert len(cdata) < len(data)
- assert data == c.decompress(cdata)
- assert data == Compressor(**params).decompress(cdata) # autodetect
- def test_zstd():
- c = get_compressor(name='zstd')
- cdata = c.compress(data)
- assert len(cdata) < len(data)
- assert data == c.decompress(cdata)
- assert data == Compressor(**params).decompress(cdata) # autodetect
- def test_autodetect_invalid():
- with pytest.raises(ValueError):
- Compressor(**params).decompress(b'\xff\xfftotalcrap')
- with pytest.raises(ValueError):
- Compressor(**params).decompress(b'\x08\x00notreallyzlib')
- def test_zlib_compat():
- # for compatibility reasons, we do not add an extra header for zlib,
- # nor do we expect one when decompressing / autodetecting
- for level in range(10):
- c = get_compressor(name='zlib', level=level)
- cdata1 = c.compress(data)
- cdata2 = zlib.compress(data, level)
- assert cdata1 == cdata2
- data2 = c.decompress(cdata2)
- assert data == data2
- data2 = Compressor(**params).decompress(cdata2)
- assert data == data2
- def test_compressor():
- params_list = [
- dict(name='none'),
- dict(name='lz4'),
- dict(name='zstd', level=1),
- dict(name='zstd', level=3),
- # avoiding high zstd levels, memory needs unclear
- dict(name='zlib', level=0),
- dict(name='zlib', level=6),
- dict(name='zlib', level=9),
- ]
- if lzma:
- params_list += [
- dict(name='lzma', level=0),
- dict(name='lzma', level=6),
- # we do not test lzma on level 9 because of the huge memory needs
- ]
- for params in params_list:
- c = Compressor(**params)
- assert data == c.decompress(c.compress(data))
- def test_auto():
- compressor_auto_zlib = CompressionSpec('auto,zlib,9').compressor
- compressor_lz4 = CompressionSpec('lz4').compressor
- compressor_zlib = CompressionSpec('zlib,9').compressor
- data = bytes(500)
- compressed_auto_zlib = compressor_auto_zlib.compress(data)
- compressed_lz4 = compressor_lz4.compress(data)
- compressed_zlib = compressor_zlib.compress(data)
- ratio = len(compressed_zlib) / len(compressed_lz4)
- assert Compressor.detect(compressed_auto_zlib) == ZLIB if ratio < 0.99 else LZ4
- data = b'\x00\xb8\xa3\xa2-O\xe1i\xb6\x12\x03\xc21\xf3\x8a\xf78\\\x01\xa5b\x07\x95\xbeE\xf8\xa3\x9ahm\xb1~'
- compressed = compressor_auto_zlib.compress(data)
- assert Compressor.detect(compressed) == CNONE
- def test_obfuscate():
- compressor = CompressionSpec('obfuscate,1,none').compressor
- data = bytes(10000)
- compressed = compressor.compress(data)
- # 2 id bytes compression, 2 id bytes obfuscator. 4 length bytes
- assert len(data) + 8 <= len(compressed) <= len(data) * 101 + 8
- # compressing 100 times the same data should give at least 50 different result sizes
- assert len({len(compressor.compress(data)) for i in range(100)}) > 50
- cs = CompressionSpec('obfuscate,2,lz4')
- assert isinstance(cs.inner.compressor, LZ4)
- compressor = cs.compressor
- data = bytes(10000)
- compressed = compressor.compress(data)
- # 2 id bytes compression, 2 id bytes obfuscator. 4 length bytes
- min_compress, max_compress = 0.2, 0.001 # estimate compression factor outer boundaries
- assert max_compress * len(data) + 8 <= len(compressed) <= min_compress * len(data) * 1001 + 8
- # compressing 100 times the same data should give multiple different result sizes
- assert len({len(compressor.compress(data)) for i in range(100)}) > 10
- cs = CompressionSpec('obfuscate,6,zstd,3')
- assert isinstance(cs.inner.compressor, ZSTD)
- compressor = cs.compressor
- data = bytes(10000)
- compressed = compressor.compress(data)
- # 2 id bytes compression, 2 id bytes obfuscator. 4 length bytes
- min_compress, max_compress = 0.2, 0.001 # estimate compression factor outer boundaries
- assert max_compress * len(data) + 8 <= len(compressed) <= min_compress * len(data) * 10000001 + 8
- # compressing 100 times the same data should give multiple different result sizes
- assert len({len(compressor.compress(data)) for i in range(100)}) > 90
- cs = CompressionSpec('obfuscate,2,auto,zstd,10')
- assert isinstance(cs.inner.compressor, Auto)
- compressor = cs.compressor
- data = bytes(10000)
- compressed = compressor.compress(data)
- # 2 id bytes compression, 2 id bytes obfuscator. 4 length bytes
- min_compress, max_compress = 0.2, 0.001 # estimate compression factor outer boundaries
- assert max_compress * len(data) + 8 <= len(compressed) <= min_compress * len(data) * 1001 + 8
- # compressing 100 times the same data should give multiple different result sizes
- assert len({len(compressor.compress(data)) for i in range(100)}) > 10
- cs = CompressionSpec('obfuscate,110,none')
- assert isinstance(cs.inner.compressor, CNONE)
- compressor = cs.compressor
- data = bytes(1000)
- compressed = compressor.compress(data)
- # N blocks + 2 id bytes obfuscator. 4 length bytes
- assert 1000 + 6 <= len(compressed) <= 1000 + 6 + 1024
- data = bytes(1100)
- compressed = compressor.compress(data)
- # N blocks + 2 id bytes obfuscator. 4 length bytes
- assert 1100 + 6 <= len(compressed) <= 1100 + 6 + 1024
- def test_compression_specs():
- with pytest.raises(ValueError):
- CompressionSpec('')
- assert isinstance(CompressionSpec('none').compressor, CNONE)
- assert isinstance(CompressionSpec('lz4').compressor, LZ4)
- zlib = CompressionSpec('zlib').compressor
- assert isinstance(zlib, ZLIB)
- assert zlib.level == 6
- zlib = CompressionSpec('zlib,0').compressor
- assert isinstance(zlib, ZLIB)
- assert zlib.level == 0
- zlib = CompressionSpec('zlib,9').compressor
- assert isinstance(zlib, ZLIB)
- assert zlib.level == 9
- with pytest.raises(ValueError):
- CompressionSpec('zlib,9,invalid')
- lzma = CompressionSpec('lzma').compressor
- assert isinstance(lzma, LZMA)
- assert lzma.level == 6
- lzma = CompressionSpec('lzma,0').compressor
- assert isinstance(lzma, LZMA)
- assert lzma.level == 0
- lzma = CompressionSpec('lzma,9').compressor
- assert isinstance(lzma, LZMA)
- assert lzma.level == 9
- zstd = CompressionSpec('zstd').compressor
- assert isinstance(zstd, ZSTD)
- assert zstd.level == 3
- zstd = CompressionSpec('zstd,1').compressor
- assert isinstance(zstd, ZSTD)
- assert zstd.level == 1
- zstd = CompressionSpec('zstd,22').compressor
- assert isinstance(zstd, ZSTD)
- assert zstd.level == 22
- with pytest.raises(ValueError):
- CompressionSpec('lzma,9,invalid')
- with pytest.raises(ValueError):
- CompressionSpec('invalid')
|