compress.py 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102
  1. import zlib
  2. try:
  3. import lzma
  4. except ImportError:
  5. lzma = None
  6. import pytest
  7. from ..compress import get_compressor, Compressor, CNULL, ZLIB, LZ4
  8. buffer = bytes(2**16)
  9. data = b'fooooooooobaaaaaaaar' * 10
  10. params = dict(name='zlib', level=6, buffer=buffer)
  11. def test_get_compressor():
  12. c = get_compressor(name='null')
  13. assert isinstance(c, CNULL)
  14. c = get_compressor(name='lz4', buffer=buffer)
  15. assert isinstance(c, LZ4)
  16. c = get_compressor(name='zlib')
  17. assert isinstance(c, ZLIB)
  18. with pytest.raises(KeyError):
  19. get_compressor(name='foobar')
  20. def test_cnull():
  21. c = get_compressor(name='null')
  22. cdata = c.compress(data)
  23. assert len(cdata) > len(data)
  24. assert data in cdata # it's not compressed and just in there 1:1
  25. assert data == c.decompress(cdata)
  26. assert data == Compressor(**params).decompress(cdata) # autodetect
  27. def test_lz4():
  28. c = get_compressor(name='lz4', buffer=buffer)
  29. cdata = c.compress(data)
  30. assert len(cdata) < len(data)
  31. assert data == c.decompress(cdata)
  32. assert data == Compressor(**params).decompress(cdata) # autodetect
  33. def test_zlib():
  34. c = get_compressor(name='zlib')
  35. cdata = c.compress(data)
  36. assert len(cdata) < len(data)
  37. assert data == c.decompress(cdata)
  38. assert data == Compressor(**params).decompress(cdata) # autodetect
  39. def test_lzma():
  40. if lzma is None:
  41. pytest.skip("No lzma support found.")
  42. c = get_compressor(name='lzma')
  43. cdata = c.compress(data)
  44. assert len(cdata) < len(data)
  45. assert data == c.decompress(cdata)
  46. assert data == Compressor(**params).decompress(cdata) # autodetect
  47. def test_autodetect_invalid():
  48. with pytest.raises(ValueError):
  49. Compressor(**params).decompress(b'\xff\xfftotalcrap')
  50. with pytest.raises(ValueError):
  51. Compressor(**params).decompress(b'\x08\x00notreallyzlib')
  52. def test_zlib_compat():
  53. # for compatibility reasons, we do not add an extra header for zlib,
  54. # nor do we expect one when decompressing / autodetecting
  55. for level in range(10):
  56. c = get_compressor(name='zlib', level=level)
  57. cdata1 = c.compress(data)
  58. cdata2 = zlib.compress(data, level)
  59. assert cdata1 == cdata2
  60. data2 = c.decompress(cdata2)
  61. assert data == data2
  62. data2 = Compressor(**params).decompress(cdata2)
  63. assert data == data2
  64. def test_compressor():
  65. params_list = [
  66. dict(name='null', buffer=buffer),
  67. dict(name='lz4', buffer=buffer),
  68. dict(name='zlib', level=0, buffer=buffer),
  69. dict(name='zlib', level=6, buffer=buffer),
  70. dict(name='zlib', level=9, buffer=buffer),
  71. ]
  72. if lzma:
  73. params_list += [
  74. dict(name='lzma', level=0, buffer=buffer),
  75. dict(name='lzma', level=6, buffer=buffer),
  76. dict(name='lzma', level=9, buffer=buffer),
  77. ]
  78. for params in params_list:
  79. c = Compressor(**params)
  80. assert data == c.decompress(c.compress(data))