compress.py 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240
  1. import os
  2. import zlib
  3. try:
  4. import lzma
  5. except ImportError:
  6. lzma = None
  7. import pytest
  8. from ..compress import get_compressor, Compressor, CompressionSpec, CNONE, ZLIB, LZ4, LZMA, ZSTD, Auto
  9. buffer = bytes(2**16)
  10. data = b'fooooooooobaaaaaaaar' * 10
  11. params = dict(name='zlib', level=6)
  12. def test_get_compressor():
  13. c = get_compressor(name='none')
  14. assert isinstance(c, CNONE)
  15. c = get_compressor(name='lz4')
  16. assert isinstance(c, LZ4)
  17. c = get_compressor(name='zlib')
  18. assert isinstance(c, ZLIB)
  19. with pytest.raises(KeyError):
  20. get_compressor(name='foobar')
  21. def test_cnull():
  22. c = get_compressor(name='none')
  23. cdata = c.compress(data)
  24. assert len(cdata) > len(data)
  25. assert data in cdata # it's not compressed and just in there 1:1
  26. assert data == c.decompress(cdata)
  27. assert data == Compressor(**params).decompress(cdata) # autodetect
  28. def test_lz4():
  29. c = get_compressor(name='lz4')
  30. cdata = c.compress(data)
  31. assert len(cdata) < len(data)
  32. assert data == c.decompress(cdata)
  33. assert data == Compressor(**params).decompress(cdata) # autodetect
  34. def test_lz4_buffer_allocation(monkeypatch):
  35. # disable fallback to no compression on incompressible data
  36. monkeypatch.setattr(LZ4, 'decide', lambda always_compress: LZ4)
  37. # test with a rather huge data object to see if buffer allocation / resizing works
  38. data = os.urandom(5 * 2**20) * 10 # 50MiB badly compressible data
  39. assert len(data) == 50 * 2**20
  40. c = Compressor('lz4')
  41. cdata = c.compress(data)
  42. assert len(cdata) > len(data)
  43. assert data == c.decompress(cdata)
  44. def test_zlib():
  45. c = get_compressor(name='zlib')
  46. cdata = c.compress(data)
  47. assert len(cdata) < len(data)
  48. assert data == c.decompress(cdata)
  49. assert data == Compressor(**params).decompress(cdata) # autodetect
  50. def test_lzma():
  51. if lzma is None:
  52. pytest.skip("No lzma support found.")
  53. c = get_compressor(name='lzma')
  54. cdata = c.compress(data)
  55. assert len(cdata) < len(data)
  56. assert data == c.decompress(cdata)
  57. assert data == Compressor(**params).decompress(cdata) # autodetect
  58. def test_zstd():
  59. c = get_compressor(name='zstd')
  60. cdata = c.compress(data)
  61. assert len(cdata) < len(data)
  62. assert data == c.decompress(cdata)
  63. assert data == Compressor(**params).decompress(cdata) # autodetect
  64. def test_autodetect_invalid():
  65. with pytest.raises(ValueError):
  66. Compressor(**params).decompress(b'\xff\xfftotalcrap')
  67. with pytest.raises(ValueError):
  68. Compressor(**params).decompress(b'\x08\x00notreallyzlib')
  69. def test_zlib_compat():
  70. # for compatibility reasons, we do not add an extra header for zlib,
  71. # nor do we expect one when decompressing / autodetecting
  72. for level in range(10):
  73. c = get_compressor(name='zlib', level=level)
  74. cdata1 = c.compress(data)
  75. cdata2 = zlib.compress(data, level)
  76. assert cdata1 == cdata2
  77. data2 = c.decompress(cdata2)
  78. assert data == data2
  79. data2 = Compressor(**params).decompress(cdata2)
  80. assert data == data2
  81. def test_compressor():
  82. params_list = [
  83. dict(name='none'),
  84. dict(name='lz4'),
  85. dict(name='zstd', level=1),
  86. dict(name='zstd', level=3),
  87. # avoiding high zstd levels, memory needs unclear
  88. dict(name='zlib', level=0),
  89. dict(name='zlib', level=6),
  90. dict(name='zlib', level=9),
  91. ]
  92. if lzma:
  93. params_list += [
  94. dict(name='lzma', level=0),
  95. dict(name='lzma', level=6),
  96. # we do not test lzma on level 9 because of the huge memory needs
  97. ]
  98. for params in params_list:
  99. c = Compressor(**params)
  100. assert data == c.decompress(c.compress(data))
  101. def test_auto():
  102. compressor_auto_zlib = CompressionSpec('auto,zlib,9').compressor
  103. compressor_lz4 = CompressionSpec('lz4').compressor
  104. compressor_zlib = CompressionSpec('zlib,9').compressor
  105. data = bytes(500)
  106. compressed_auto_zlib = compressor_auto_zlib.compress(data)
  107. compressed_lz4 = compressor_lz4.compress(data)
  108. compressed_zlib = compressor_zlib.compress(data)
  109. ratio = len(compressed_zlib) / len(compressed_lz4)
  110. assert Compressor.detect(compressed_auto_zlib) == ZLIB if ratio < 0.99 else LZ4
  111. data = b'\x00\xb8\xa3\xa2-O\xe1i\xb6\x12\x03\xc21\xf3\x8a\xf78\\\x01\xa5b\x07\x95\xbeE\xf8\xa3\x9ahm\xb1~'
  112. compressed = compressor_auto_zlib.compress(data)
  113. assert Compressor.detect(compressed) == CNONE
  114. def test_obfuscate():
  115. compressor = CompressionSpec('obfuscate,1,none').compressor
  116. data = bytes(10000)
  117. compressed = compressor.compress(data)
  118. # 2 id bytes compression, 2 id bytes obfuscator. 4 length bytes
  119. assert len(data) + 8 <= len(compressed) <= len(data) * 101 + 8
  120. # compressing 100 times the same data should give at least 50 different result sizes
  121. assert len({len(compressor.compress(data)) for i in range(100)}) > 50
  122. cs = CompressionSpec('obfuscate,2,lz4')
  123. assert isinstance(cs.inner.compressor, LZ4)
  124. compressor = cs.compressor
  125. data = bytes(10000)
  126. compressed = compressor.compress(data)
  127. # 2 id bytes compression, 2 id bytes obfuscator. 4 length bytes
  128. min_compress, max_compress = 0.2, 0.001 # estimate compression factor outer boundaries
  129. assert max_compress * len(data) + 8 <= len(compressed) <= min_compress * len(data) * 1001 + 8
  130. # compressing 100 times the same data should give multiple different result sizes
  131. assert len({len(compressor.compress(data)) for i in range(100)}) > 10
  132. cs = CompressionSpec('obfuscate,6,zstd,3')
  133. assert isinstance(cs.inner.compressor, ZSTD)
  134. compressor = cs.compressor
  135. data = bytes(10000)
  136. compressed = compressor.compress(data)
  137. # 2 id bytes compression, 2 id bytes obfuscator. 4 length bytes
  138. min_compress, max_compress = 0.2, 0.001 # estimate compression factor outer boundaries
  139. assert max_compress * len(data) + 8 <= len(compressed) <= min_compress * len(data) * 10000001 + 8
  140. # compressing 100 times the same data should give multiple different result sizes
  141. assert len({len(compressor.compress(data)) for i in range(100)}) > 90
  142. cs = CompressionSpec('obfuscate,2,auto,zstd,10')
  143. assert isinstance(cs.inner.compressor, Auto)
  144. compressor = cs.compressor
  145. data = bytes(10000)
  146. compressed = compressor.compress(data)
  147. # 2 id bytes compression, 2 id bytes obfuscator. 4 length bytes
  148. min_compress, max_compress = 0.2, 0.001 # estimate compression factor outer boundaries
  149. assert max_compress * len(data) + 8 <= len(compressed) <= min_compress * len(data) * 1001 + 8
  150. # compressing 100 times the same data should give multiple different result sizes
  151. assert len({len(compressor.compress(data)) for i in range(100)}) > 10
  152. cs = CompressionSpec('obfuscate,110,none')
  153. assert isinstance(cs.inner.compressor, CNONE)
  154. compressor = cs.compressor
  155. data = bytes(1000)
  156. compressed = compressor.compress(data)
  157. # N blocks + 2 id bytes obfuscator. 4 length bytes
  158. assert 1000 + 6 <= len(compressed) <= 1000 + 6 + 1024
  159. data = bytes(1100)
  160. compressed = compressor.compress(data)
  161. # N blocks + 2 id bytes obfuscator. 4 length bytes
  162. assert 1100 + 6 <= len(compressed) <= 1100 + 6 + 1024
  163. def test_compression_specs():
  164. with pytest.raises(ValueError):
  165. CompressionSpec('')
  166. assert isinstance(CompressionSpec('none').compressor, CNONE)
  167. assert isinstance(CompressionSpec('lz4').compressor, LZ4)
  168. zlib = CompressionSpec('zlib').compressor
  169. assert isinstance(zlib, ZLIB)
  170. assert zlib.level == 6
  171. zlib = CompressionSpec('zlib,0').compressor
  172. assert isinstance(zlib, ZLIB)
  173. assert zlib.level == 0
  174. zlib = CompressionSpec('zlib,9').compressor
  175. assert isinstance(zlib, ZLIB)
  176. assert zlib.level == 9
  177. with pytest.raises(ValueError):
  178. CompressionSpec('zlib,9,invalid')
  179. lzma = CompressionSpec('lzma').compressor
  180. assert isinstance(lzma, LZMA)
  181. assert lzma.level == 6
  182. lzma = CompressionSpec('lzma,0').compressor
  183. assert isinstance(lzma, LZMA)
  184. assert lzma.level == 0
  185. lzma = CompressionSpec('lzma,9').compressor
  186. assert isinstance(lzma, LZMA)
  187. assert lzma.level == 9
  188. zstd = CompressionSpec('zstd').compressor
  189. assert isinstance(zstd, ZSTD)
  190. assert zstd.level == 3
  191. zstd = CompressionSpec('zstd,1').compressor
  192. assert isinstance(zstd, ZSTD)
  193. assert zstd.level == 1
  194. zstd = CompressionSpec('zstd,22').compressor
  195. assert isinstance(zstd, ZSTD)
  196. assert zstd.level == 22
  197. with pytest.raises(ValueError):
  198. CompressionSpec('lzma,9,invalid')
  199. with pytest.raises(ValueError):
  200. CompressionSpec('invalid')