Parcourir la source

integrate compress code, new compression spec parser for commandline

New null and lz4 compression.
Giving -C 0 now uses null compression, not zlib level 0 any more
(null has almost zero overhead while zlib-level0 still had to package everything into zlib frames).
Giving -C 10 uses new lz4 compression, super fast compression and even faster decompression.
See borg create --help (and --compression argument).

fix some issues, clean up, optimize:
CNULL: always return bytes
LZ4: deal with getting memoryviews
Compressor: give bytes to detect(), avoid memoryviews
for lz4, always use same COMPR_BUFFER, avoid memory management costs.
check --chunker-params CHUNK_MAX_EXP upper limit
Thomas Waldmann il y a 10 ans
Parent
commit
8997766202
4 fichiers modifiés avec 82 ajouts et 17 suppressions
  1. 15 5
      borg/archiver.py
  2. 21 6
      borg/compress.pyx
  3. 40 0
      borg/helpers.py
  4. 6 6
      borg/key.py

+ 15 - 5
borg/archiver.py

@@ -14,6 +14,7 @@ import traceback
 
 
 from . import __version__
 from . import __version__
 from .archive import Archive, ArchiveChecker, CHUNKER_PARAMS
 from .archive import Archive, ArchiveChecker, CHUNKER_PARAMS
+from .compress import Compressor, COMPR_BUFFER
 from .repository import Repository
 from .repository import Repository
 from .cache import Cache
 from .cache import Cache
 from .key import key_creator
 from .key import key_creator
@@ -21,7 +22,7 @@ from .helpers import Error, location_validator, format_time, format_file_size, \
     format_file_mode, ExcludePattern, exclude_path, adjust_patterns, to_localtime, timestamp, \
     format_file_mode, ExcludePattern, exclude_path, adjust_patterns, to_localtime, timestamp, \
     get_cache_dir, get_keys_dir, format_timedelta, prune_within, prune_split, \
     get_cache_dir, get_keys_dir, format_timedelta, prune_within, prune_split, \
     Manifest, remove_surrogates, update_excludes, format_archive, check_extension_modules, Statistics, \
     Manifest, remove_surrogates, update_excludes, format_archive, check_extension_modules, Statistics, \
-    is_cachedir, bigint_to_int, ChunkerParams
+    is_cachedir, bigint_to_int, ChunkerParams, CompressionSpec
 from .remote import RepositoryServer, RemoteRepository
 from .remote import RepositoryServer, RemoteRepository
 
 
 
 
@@ -101,7 +102,9 @@ Type "Yes I am sure" if you understand this and want to continue.\n""")
         t0 = datetime.now()
         t0 = datetime.now()
         repository = self.open_repository(args.archive, exclusive=True)
         repository = self.open_repository(args.archive, exclusive=True)
         manifest, key = Manifest.load(repository)
         manifest, key = Manifest.load(repository)
-        key.compression_level = args.compression
+        compr_args = dict(buffer=COMPR_BUFFER)
+        compr_args.update(args.compression)
+        key.compressor = Compressor(**compr_args)
         cache = Cache(repository, key, manifest, do_files=args.cache_files)
         cache = Cache(repository, key, manifest, do_files=args.cache_files)
         archive = Archive(repository, key, manifest, args.archive.archive, cache=cache,
         archive = Archive(repository, key, manifest, args.archive.archive, cache=cache,
                           create=True, checkpoint_interval=args.checkpoint_interval,
                           create=True, checkpoint_interval=args.checkpoint_interval,
@@ -634,9 +637,16 @@ Type "Yes I am sure" if you understand this and want to continue.\n""")
                                metavar='CHUNK_MIN_EXP,CHUNK_MAX_EXP,HASH_MASK_BITS,HASH_WINDOW_SIZE',
                                metavar='CHUNK_MIN_EXP,CHUNK_MAX_EXP,HASH_MASK_BITS,HASH_WINDOW_SIZE',
                                help='specify the chunker parameters. default: %d,%d,%d,%d' % CHUNKER_PARAMS)
                                help='specify the chunker parameters. default: %d,%d,%d,%d' % CHUNKER_PARAMS)
         subparser.add_argument('-C', '--compression', dest='compression',
         subparser.add_argument('-C', '--compression', dest='compression',
-                               type=int, default=0, metavar='N',
-                               help='select compression algorithm and level. 0..9 is supported and means zlib '
-                                    'level 0 (no compression, fast, default) .. zlib level 9 (high compression, slow).')
+                               type=CompressionSpec, default=dict(name='null'), metavar='COMPRESSION',
+                               help='select compression algorithm and level, by giving a number: '
+                                    '0 == no compression [default], '
+                                    '1..9 == zlib level 1..9, '
+                                    '10 == lz4. '
+                                    'Alternatively, you can also give a name and optionally additional args: '
+                                    'null == no compression, '
+                                    'zlib == zlib (default level 6), '
+                                    'zlib,0 .. zlib,9 == zlib (with level 0..9), '
+                                    'lz4 == lz4.')
         subparser.add_argument('archive', metavar='ARCHIVE',
         subparser.add_argument('archive', metavar='ARCHIVE',
                                type=location_validator(archive=True),
                                type=location_validator(archive=True),
                                help='archive to create')
                                help='archive to create')

+ 21 - 6
borg/compress.pyx

@@ -1,8 +1,5 @@
 import zlib
 import zlib
 
 
-from libc.stdlib cimport malloc, free
-
-
 cdef extern from "lz4.h":
 cdef extern from "lz4.h":
     int LZ4_compress_limitedOutput(const char* source, char* dest, int inputSize, int maxOutputSize) nogil
     int LZ4_compress_limitedOutput(const char* source, char* dest, int inputSize, int maxOutputSize) nogil
     int LZ4_decompress_safe(const char* source, char* dest, int inputSize, int maxOutputSize) nogil
     int LZ4_decompress_safe(const char* source, char* dest, int inputSize, int maxOutputSize) nogil
@@ -40,7 +37,15 @@ class CNULL(CompressorBase):
     """
     """
     ID = b'\x00\x00'
     ID = b'\x00\x00'
     name = 'null'
     name = 'null'
-    # base class does all we need
+
+    def compress(self, data):
+        return super().compress(data)
+
+    def decompress(self, data):
+        data = super().decompress(data)
+        if not isinstance(data, bytes):
+            data = bytes(data)
+        return data
 
 
 
 
 cdef class LZ4(CompressorBase):
 cdef class LZ4(CompressorBase):
@@ -71,6 +76,8 @@ cdef class LZ4(CompressorBase):
         self.bufsize = len(buffer)
         self.bufsize = len(buffer)
 
 
     def compress(self, idata):
     def compress(self, idata):
+        if not isinstance(idata, bytes):
+            idata = bytes(idata)  # code below does not work with memoryview
         cdef int isize = len(idata)
         cdef int isize = len(idata)
         cdef int osize = self.bufsize
         cdef int osize = self.bufsize
         cdef char *source = idata
         cdef char *source = idata
@@ -82,6 +89,8 @@ cdef class LZ4(CompressorBase):
         return super().compress(dest[:osize])
         return super().compress(dest[:osize])
 
 
     def decompress(self, idata):
     def decompress(self, idata):
+        if not isinstance(idata, bytes):
+            idata = bytes(idata)  # code below does not work with memoryview
         idata = super().decompress(idata)
         idata = super().decompress(idata)
         cdef int isize = len(idata)
         cdef int isize = len(idata)
         cdef int osize = self.bufsize
         cdef int osize = self.bufsize
@@ -141,7 +150,7 @@ class Compressor:
     compresses using a compressor with given name and parameters
     compresses using a compressor with given name and parameters
     decompresses everything we can handle (autodetect)
     decompresses everything we can handle (autodetect)
     """
     """
-    def __init__(self, name='zlib', **kwargs):
+    def __init__(self, name='null', **kwargs):
         self.params = kwargs
         self.params = kwargs
         self.compressor = get_compressor(name, **self.params)
         self.compressor = get_compressor(name, **self.params)
 
 
@@ -149,8 +158,14 @@ class Compressor:
         return self.compressor.compress(data)
         return self.compressor.compress(data)
 
 
     def decompress(self, data):
     def decompress(self, data):
+        hdr = bytes(data[:2])  # detect() does not work with memoryview
         for cls in COMPRESSOR_LIST:
         for cls in COMPRESSOR_LIST:
-            if cls.detect(data):
+            if cls.detect(hdr):
                 return cls(**self.params).decompress(data)
                 return cls(**self.params).decompress(data)
         else:
         else:
             raise ValueError('No decompressor for this data found: %r.', data[:2])
             raise ValueError('No decompressor for this data found: %r.', data[:2])
+
+
+# a buffer used for (de)compression result, which can be slightly bigger
+# than the chunk buffer in the worst (incompressible data) case, add 10%:
+COMPR_BUFFER = bytes(int(1.1 * 2 ** 23))  # CHUNK_MAX_EXP == 23

+ 40 - 0
borg/helpers.py

@@ -278,9 +278,49 @@ def timestamp(s):
 
 
 def ChunkerParams(s):
 def ChunkerParams(s):
     window_size, chunk_mask, chunk_min, chunk_max = s.split(',')
     window_size, chunk_mask, chunk_min, chunk_max = s.split(',')
+    if int(chunk_max) > 23:
+        # do not go beyond 2**23 (8MB) chunk size now,
+        # COMPR_BUFFER can only cope with up to this size
+        raise ValueError
     return int(window_size), int(chunk_mask), int(chunk_min), int(chunk_max)
     return int(window_size), int(chunk_mask), int(chunk_min), int(chunk_max)
 
 
 
 
+def CompressionSpec(s):
+    values = s.split(',')
+    count = len(values)
+    if count < 1:
+        raise ValueError
+    compression = values[0]
+    try:
+        compression = int(compression)
+        if count > 1:
+            raise ValueError
+        # it is just --compression N
+        if compression == 0:
+            return dict(name='null')
+        if 1 <= compression <= 9:
+            return dict(name='zlib', level=compression)
+        if compression == 10:
+            return dict(name='lz4')
+        raise ValueError
+    except ValueError:
+        # --compression algo[,...]
+        name = compression
+        if name in ('null', 'lz4', ):
+            return dict(name=name)
+        if name == 'zlib':
+            if count < 2:
+                level = 6  # default compression level in py stdlib
+            elif count == 2:
+                level = int(values[1])
+                if not 0 <= level <= 9:
+                    raise ValueError
+            else:
+                raise ValueError
+            return dict(name='zlib', level=level)
+        raise ValueError
+
+
 def is_cachedir(path):
 def is_cachedir(path):
     """Determines whether the specified path is a cache directory (and
     """Determines whether the specified path is a cache directory (and
     therefore should potentially be excluded from the backup) according to
     therefore should potentially be excluded from the backup) according to

+ 6 - 6
borg/key.py

@@ -6,9 +6,9 @@ import msgpack
 import textwrap
 import textwrap
 import hmac
 import hmac
 from hashlib import sha256
 from hashlib import sha256
-import zlib
 
 
 from .crypto import pbkdf2_sha256, get_random_bytes, AES, bytes_to_long, long_to_bytes, bytes_to_int, num_aes_blocks
 from .crypto import pbkdf2_sha256, get_random_bytes, AES, bytes_to_long, long_to_bytes, bytes_to_int, num_aes_blocks
+from .compress import Compressor, COMPR_BUFFER
 from .helpers import IntegrityError, get_keys_dir, Error
 from .helpers import IntegrityError, get_keys_dir, Error
 
 
 PREFIX = b'\0' * 8
 PREFIX = b'\0' * 8
@@ -68,7 +68,7 @@ class KeyBase:
         self.TYPE_STR = bytes([self.TYPE])
         self.TYPE_STR = bytes([self.TYPE])
         self.repository = repository
         self.repository = repository
         self.target = None  # key location file path / repo obj
         self.target = None  # key location file path / repo obj
-        self.compression_level = 0
+        self.compressor = Compressor('null', buffer=COMPR_BUFFER)
 
 
     def id_hash(self, data):
     def id_hash(self, data):
         """Return HMAC hash using the "id" HMAC key
         """Return HMAC hash using the "id" HMAC key
@@ -99,12 +99,12 @@ class PlaintextKey(KeyBase):
         return sha256(data).digest()
         return sha256(data).digest()
 
 
     def encrypt(self, data):
     def encrypt(self, data):
-        return b''.join([self.TYPE_STR, zlib.compress(data, self.compression_level)])
+        return b''.join([self.TYPE_STR, self.compressor.compress(data)])
 
 
     def decrypt(self, id, data):
     def decrypt(self, id, data):
         if data[0] != self.TYPE:
         if data[0] != self.TYPE:
             raise IntegrityError('Invalid encryption envelope')
             raise IntegrityError('Invalid encryption envelope')
-        data = zlib.decompress(memoryview(data)[1:])
+        data = self.compressor.decompress(memoryview(data)[1:])
         if id and sha256(data).digest() != id:
         if id and sha256(data).digest() != id:
             raise IntegrityError('Chunk id verification failed')
             raise IntegrityError('Chunk id verification failed')
         return data
         return data
@@ -131,7 +131,7 @@ class AESKeyBase(KeyBase):
         return HMAC(self.id_key, data, sha256).digest()
         return HMAC(self.id_key, data, sha256).digest()
 
 
     def encrypt(self, data):
     def encrypt(self, data):
-        data = zlib.compress(data, self.compression_level)
+        data = self.compressor.compress(data)
         self.enc_cipher.reset()
         self.enc_cipher.reset()
         data = b''.join((self.enc_cipher.iv[8:], self.enc_cipher.encrypt(data)))
         data = b''.join((self.enc_cipher.iv[8:], self.enc_cipher.encrypt(data)))
         hmac = HMAC(self.enc_hmac_key, data, sha256).digest()
         hmac = HMAC(self.enc_hmac_key, data, sha256).digest()
@@ -144,7 +144,7 @@ class AESKeyBase(KeyBase):
         if memoryview(HMAC(self.enc_hmac_key, memoryview(data)[33:], sha256).digest()) != hmac:
         if memoryview(HMAC(self.enc_hmac_key, memoryview(data)[33:], sha256).digest()) != hmac:
             raise IntegrityError('Encryption envelope checksum mismatch')
             raise IntegrityError('Encryption envelope checksum mismatch')
         self.dec_cipher.reset(iv=PREFIX + data[33:41])
         self.dec_cipher.reset(iv=PREFIX + data[33:41])
-        data = zlib.decompress(self.dec_cipher.decrypt(data[41:]))  # should use memoryview
+        data = self.compressor.decompress(self.dec_cipher.decrypt(data[41:]))
         if id and HMAC(self.id_key, data, sha256).digest() != id:
         if id and HMAC(self.id_key, data, sha256).digest() != id:
             raise IntegrityError('Chunk id verification failed')
             raise IntegrityError('Chunk id verification failed')
         return data
         return data