Browse Source

flexible compression

Thomas Waldmann 9 năm trước cách đây
mục cha
commit
f20a78cda8
6 tập tin đã thay đổi với 226 bổ sung26 xóa
  1. 20 11
      borg/archive.py
  2. 10 6
      borg/archiver.py
  3. 75 0
      borg/helpers.py
  4. 15 7
      borg/key.py
  5. 50 2
      borg/testsuite/helpers.py
  6. 56 0
      docs/misc/compression.conf

+ 20 - 11
borg/archive.py

@@ -15,13 +15,14 @@ import sys
 import time
 from io import BytesIO
 from . import xattr
-from .compress import Compressor, COMPR_BUFFER
+from .compress import COMPR_BUFFER
 from .constants import *  # NOQA
 from .helpers import Chunk, Error, uid2user, user2uid, gid2group, group2gid, \
     parse_timestamp, to_localtime, format_time, format_timedelta, safe_encode, safe_decode, \
     Manifest, Statistics, decode_dict, make_path_safe, StableDict, int_to_bigint, bigint_to_int, bin_to_hex, \
     ProgressIndicatorPercent, ChunkIteratorFileWrapper, remove_surrogates, log_multi, \
-    PathPrefixPattern, FnmatchPattern, open_item, file_status, format_file_size, consume
+    PathPrefixPattern, FnmatchPattern, open_item, file_status, format_file_size, consume, \
+    CompressionDecider1, CompressionDecider2, CompressionSpec
 from .repository import Repository
 from .platform import acl_get, acl_set
 from .chunker import Chunker
@@ -125,7 +126,7 @@ class Archive:
 
     def __init__(self, repository, key, manifest, name, cache=None, create=False,
                  checkpoint_interval=300, numeric_owner=False, progress=False,
-                 chunker_params=CHUNKER_PARAMS, start=None, end=None):
+                 chunker_params=CHUNKER_PARAMS, start=None, end=None, compression=None, compression_files=None):
         self.cwd = os.getcwd()
         self.key = key
         self.repository = repository
@@ -148,6 +149,9 @@ class Archive:
         if create:
             self.items_buffer = CacheChunkBuffer(self.cache, self.key, self.stats)
             self.chunker = Chunker(self.key.chunk_seed, *chunker_params)
+            self.compression_decider1 = CompressionDecider1(compression or CompressionSpec('none'),
+                                                            compression_files or [])
+            key.compression_decider2 = CompressionDecider2(compression or CompressionSpec('none'))
             if name in manifest.archives:
                 raise self.AlreadyExists(name)
             self.last_checkpoint = time.time()
@@ -592,11 +596,15 @@ Number of files: {0.stats.nfiles}'''.format(
         }
         # Only chunkify the file if needed
         if chunks is None:
+            compress = self.compression_decider1.decide(path)
+            logger.debug('%s -> compression %s', path, compress['name'])
             fh = Archive._open_rb(path)
             with os.fdopen(fh, 'rb') as fd:
                 chunks = []
                 for data in self.chunker.chunkify(fd, fh):
-                    chunks.append(cache.add_chunk(self.key.id_hash(data), Chunk(data), self.stats))
+                    chunks.append(cache.add_chunk(self.key.id_hash(data),
+                                                  Chunk(data, compress=compress),
+                                                  self.stats))
                     if self.show_progress:
                         self.stats.show_progress(item=item, dt=0.2)
             cache.memorize_file(path_hash, st, [c.id for c in chunks])
@@ -939,7 +947,7 @@ class ArchiveRecreater:
 
     def __init__(self, repository, manifest, key, cache, matcher,
                  exclude_caches=False, exclude_if_present=None, keep_tag_files=False,
-                 chunker_params=None, compression=None,
+                 chunker_params=None, compression=None, compression_files=None,
                  dry_run=False, stats=False, progress=False, file_status_printer=None):
         self.repository = repository
         self.key = key
@@ -952,12 +960,12 @@ class ArchiveRecreater:
         self.keep_tag_files = keep_tag_files
 
         self.chunker_params = chunker_params or CHUNKER_PARAMS
-        self.compression = compression or dict(name='none')
-        self.seen_chunks = set()
         self.recompress = bool(compression)
-        compr_args = dict(buffer=COMPR_BUFFER)
-        compr_args.update(self.compression)
-        key.compressor = Compressor(**compr_args)
+        self.compression = compression or CompressionSpec('none')
+        self.seen_chunks = set()
+        self.compression_decider1 = CompressionDecider1(compression or CompressionSpec('none'),
+                                                            compression_files or [])
+        key.compression_decider2 = CompressionDecider2(compression or CompressionSpec('none'))
 
         self.autocommit_threshold = max(self.AUTOCOMMIT_THRESHOLD, self.cache.chunks_stored_size() / 100)
         logger.debug("Autocommit threshold: %s", format_file_size(self.autocommit_threshold))
@@ -1045,6 +1053,7 @@ class ArchiveRecreater:
 
     def process_chunks(self, archive, target, item):
         """Return new chunk ID list for 'item'."""
+        # TODO: support --compression-from
         if not self.recompress and not target.recreate_rechunkify:
             for chunk_id, size, csize in item[b'chunks']:
                 self.cache.chunk_incref(chunk_id, target.stats)
@@ -1239,7 +1248,7 @@ class ArchiveRecreater:
     def create_target_archive(self, name):
         target = Archive(self.repository, self.key, self.manifest, name, create=True,
                           progress=self.progress, chunker_params=self.chunker_params, cache=self.cache,
-                          checkpoint_interval=0)
+                          checkpoint_interval=0, compression=self.compression)
         target.recreate_partial_chunks = None
         target.recreate_uncomitted_bytes = 0
         return target

+ 10 - 6
borg/archiver.py

@@ -279,14 +279,12 @@ class Archiver:
         dry_run = args.dry_run
         t0 = datetime.utcnow()
         if not dry_run:
-            compr_args = dict(buffer=COMPR_BUFFER)
-            compr_args.update(args.compression)
-            key.compressor = Compressor(**compr_args)
             with Cache(repository, key, manifest, do_files=args.cache_files, lock_wait=self.lock_wait) as cache:
                 archive = Archive(repository, key, manifest, args.location.archive, cache=cache,
                                   create=True, checkpoint_interval=args.checkpoint_interval,
                                   numeric_owner=args.numeric_owner, progress=args.progress,
-                                  chunker_params=args.chunker_params, start=t0)
+                                  chunker_params=args.chunker_params, start=t0,
+                                  compression=args.compression, compression_files=args.compression_files)
                 create_inner(archive, cache)
         else:
             create_inner(None, None)
@@ -868,8 +866,8 @@ class Archiver:
 
         recreater = ArchiveRecreater(repository, manifest, key, cache, matcher,
                                      exclude_caches=args.exclude_caches, exclude_if_present=args.exclude_if_present,
-                                     keep_tag_files=args.keep_tag_files,
-                                     compression=args.compression, chunker_params=args.chunker_params,
+                                     keep_tag_files=args.keep_tag_files, chunker_params=args.chunker_params,
+                                     compression=args.compression, compression_files=args.compression_files,
                                      progress=args.progress, stats=args.stats,
                                      file_status_printer=self.print_file_status,
                                      dry_run=args.dry_run)
@@ -1349,6 +1347,9 @@ class Archiver:
                                         'zlib,0 .. zlib,9 == zlib (with level 0..9),\n'
                                         'lzma == lzma (default level 6),\n'
                                         'lzma,0 .. lzma,9 == lzma (with level 0..9).')
+        archive_group.add_argument('--compression-from', dest='compression_files',
+                                   type=argparse.FileType('r'), action='append',
+                                   metavar='COMPRESSIONCONFIG', help='read compression patterns from COMPRESSIONCONFIG, one per line')
 
         subparser.add_argument('location', metavar='ARCHIVE',
                                type=location_validator(archive=True),
@@ -1815,6 +1816,9 @@ class Archiver:
                                         'zlib,0 .. zlib,9 == zlib (with level 0..9),\n'
                                         'lzma == lzma (default level 6),\n'
                                         'lzma,0 .. lzma,9 == lzma (with level 0..9).')
+        archive_group.add_argument('--compression-from', dest='compression_files',
+                                   type=argparse.FileType('r'), action='append',
+                                   metavar='COMPRESSIONCONFIG', help='read compression patterns from COMPRESSIONCONFIG, one per line')
         archive_group.add_argument('--chunker-params', dest='chunker_params',
                                    type=ChunkerParams, default=None,
                                    metavar='CHUNK_MIN_EXP,CHUNK_MAX_EXP,HASH_MASK_BITS,HASH_WINDOW_SIZE',

+ 75 - 0
borg/helpers.py

@@ -31,6 +31,7 @@ from . import hashindex
 from . import chunker
 from .constants import *  # NOQA
 from . import crypto
+from .compress import COMPR_BUFFER
 from . import shellpattern
 import msgpack
 import msgpack.fallback
@@ -1423,3 +1424,77 @@ except ImportError:
 
 def scandir_inorder(path='.'):
     return sorted(scandir(path), key=lambda dirent: dirent.inode())
+
+
+def clean_lines(lines, lstrip=None, rstrip=None, remove_empty=True, remove_comments=True):
+    """
+    clean lines (usually read from a config file):
+
+    1. strip whitespace (left and right), 2. remove empty lines, 3. remove comments.
+
+    note: only "pure comment lines" are supported, no support for "trailing comments".
+
+    :param lines: input line iterator (e.g. list or open text file) that gives unclean input lines
+    :param lstrip: lstrip call arguments or False, if lstripping is not desired
+    :param rstrip: rstrip call arguments or False, if rstripping is not desired
+    :param remove_comments: remove comment lines (lines starting with "#")
+    :param remove_empty: remove empty lines
+    :return: yields processed lines
+    """
+    for line in lines:
+        if lstrip is not False:
+            line = line.lstrip(lstrip)
+        if rstrip is not False:
+            line = line.rstrip(rstrip)
+        if remove_empty and not line:
+            continue
+        if remove_comments and line.startswith('#'):
+            continue
+        yield line
+
+
+class CompressionDecider1:
+    def __init__(self, compression, compression_files):
+        """
+        Initialize a CompressionDecider instance (and read config files, if needed).
+
+        :param compression: default CompressionSpec (e.g. from --compression option)
+        :param compression_files: list of compression config files (e.g. from --compression-from) or
+                                  a list of other line iterators
+        """
+        self.compression = compression
+        if not compression_files:
+            self.matcher = None
+        else:
+            self.matcher = PatternMatcher(fallback=compression)
+            for file in compression_files:
+                try:
+                    for line in clean_lines(file):
+                        try:
+                            compr_spec, fn_pattern = line.split(':', 1)
+                        except:
+                            continue
+                        self.matcher.add([parse_pattern(fn_pattern)], CompressionSpec(compr_spec))
+                finally:
+                    if hasattr(file, 'close'):
+                        file.close()
+
+    def decide(self, path):
+        if self.matcher is not None:
+            return self.matcher.match(path)
+        return self.compression
+
+
+class CompressionDecider2:
+    def __init__(self, compression):
+        self.compression = compression
+
+    def decide(self, chunk):
+        # nothing fancy here yet: we either use what the metadata says or the default
+        # later, we can decide based on the chunk data also.
+        # if we compress the data here to decide, we can even update the chunk data
+        # and modify the metadata as desired.
+        compr_spec = chunk.meta.get('compress', self.compression)
+        compr_args = dict(buffer=COMPR_BUFFER)
+        compr_args.update(compr_spec)
+        return compr_args, chunk

+ 15 - 7
borg/key.py

@@ -7,13 +7,13 @@ import textwrap
 from hmac import compare_digest
 from hashlib import sha256, pbkdf2_hmac
 
-from .helpers import Chunk, IntegrityError, get_keys_dir, Error, yes, bin_to_hex
+from .helpers import Chunk, IntegrityError, get_keys_dir, Error, yes, bin_to_hex, CompressionDecider2, CompressionSpec
 from .logger import create_logger
 logger = create_logger()
 
 from .constants import *  # NOQA
 from .crypto import AES, bytes_to_long, long_to_bytes, bytes_to_int, num_aes_blocks, hmac_sha256
-from .compress import Compressor, COMPR_BUFFER
+from .compress import Compressor, COMPR_BUFFER, get_compressor
 import msgpack
 
 PREFIX = b'\0' * 8
@@ -71,12 +71,20 @@ class KeyBase:
         self.TYPE_STR = bytes([self.TYPE])
         self.repository = repository
         self.target = None  # key location file path / repo obj
-        self.compressor = Compressor('none', buffer=COMPR_BUFFER)
+        self.compression_decider2 = CompressionDecider2(CompressionSpec('none'))
+        self.compressor = Compressor('none', buffer=COMPR_BUFFER)  # for decompression
 
     def id_hash(self, data):
         """Return HMAC hash using the "id" HMAC key
         """
 
+    def compress(self, chunk):
+        compr_args, chunk = self.compression_decider2.decide(chunk)
+        compressor = Compressor(**compr_args)
+        meta, data = chunk
+        data = compressor.compress(data)
+        return Chunk(data, **meta)
+
     def encrypt(self, chunk):
         pass
 
@@ -102,8 +110,8 @@ class PlaintextKey(KeyBase):
         return sha256(data).digest()
 
     def encrypt(self, chunk):
-        meta, data = chunk
-        return b''.join([self.TYPE_STR, self.compressor.compress(data)])
+        chunk = self.compress(chunk)
+        return b''.join([self.TYPE_STR, chunk.data])
 
     def decrypt(self, id, data):
         if data[0] != self.TYPE:
@@ -135,9 +143,9 @@ class AESKeyBase(KeyBase):
         return hmac_sha256(self.id_key, data)
 
     def encrypt(self, chunk):
-        data = self.compressor.compress(chunk.data)
+        chunk = self.compress(chunk)
         self.enc_cipher.reset()
-        data = b''.join((self.enc_cipher.iv[8:], self.enc_cipher.encrypt(data)))
+        data = b''.join((self.enc_cipher.iv[8:], self.enc_cipher.encrypt(chunk.data)))
         hmac = hmac_sha256(self.enc_hmac_key, data)
         return b''.join((self.TYPE_STR, hmac, data))
 

+ 50 - 2
borg/testsuite/helpers.py

@@ -10,11 +10,12 @@ import msgpack
 import msgpack.fallback
 import time
 
-from ..helpers import Location, format_file_size, format_timedelta, make_path_safe, \
+from ..helpers import Location, format_file_size, format_timedelta, make_path_safe, clean_lines, \
     prune_within, prune_split, get_cache_dir, get_keys_dir, Statistics, is_slow_msgpack, \
     yes, TRUISH, FALSISH, DEFAULTISH, \
-    StableDict, int_to_bigint, bigint_to_int, bin_to_hex, parse_timestamp, CompressionSpec, ChunkerParams, Chunk, \
+    StableDict, int_to_bigint, bigint_to_int, bin_to_hex, parse_timestamp, ChunkerParams, Chunk, \
     ProgressIndicatorPercent, ProgressIndicatorEndless, load_excludes, parse_pattern, \
+    CompressionSpec, CompressionDecider1, CompressionDecider2, \
     PatternMatcher, RegexPattern, PathPrefixPattern, FnmatchPattern, ShellPattern, partial_format, ChunkIteratorFileWrapper
 from . import BaseTestCase, environment_variable, FakeInputs
 
@@ -915,3 +916,50 @@ def test_chunk_file_wrapper():
     cfw = ChunkIteratorFileWrapper(iter([]))
     assert cfw.read(2) == b''
     assert cfw.exhausted
+
+
+def test_clean_lines():
+    conf = """\
+#comment
+data1 #data1
+data2
+
+ data3
+""".splitlines(keepends=True)
+    assert list(clean_lines(conf)) == ['data1 #data1', 'data2', 'data3', ]
+    assert list(clean_lines(conf, lstrip=False)) == ['data1 #data1', 'data2', ' data3', ]
+    assert list(clean_lines(conf, rstrip=False)) == ['data1 #data1\n', 'data2\n', 'data3\n', ]
+    assert list(clean_lines(conf, remove_empty=False)) == ['data1 #data1', 'data2', '', 'data3', ]
+    assert list(clean_lines(conf, remove_comments=False)) == ['#comment', 'data1 #data1', 'data2', 'data3', ]
+
+
+def test_compression_decider1():
+    default = CompressionSpec('zlib')
+    conf = """
+# use super-fast lz4 compression on huge VM files in this path:
+lz4:/srv/vm_disks
+
+# jpeg or zip files do not compress:
+none:*.jpeg
+none:*.zip
+""".splitlines()
+
+    cd = CompressionDecider1(default, [])  # no conf, always use default
+    assert cd.decide('/srv/vm_disks/linux')['name'] == 'zlib'
+    assert cd.decide('test.zip')['name'] == 'zlib'
+    assert cd.decide('test')['name'] == 'zlib'
+
+    cd = CompressionDecider1(default, [conf, ])
+    assert cd.decide('/srv/vm_disks/linux')['name'] == 'lz4'
+    assert cd.decide('test.zip')['name'] == 'none'
+    assert cd.decide('test')['name'] == 'zlib'  # no match in conf, use default
+
+
+def test_compression_decider2():
+    default = CompressionSpec('zlib')
+
+    cd = CompressionDecider2(default)
+    compr_spec, chunk = cd.decide(Chunk(None))
+    assert compr_spec['name'] == 'zlib'
+    compr_spec, chunk = cd.decide(Chunk(None, compress=CompressionSpec('lzma')))
+    assert compr_spec['name'] == 'lzma'

+ 56 - 0
docs/misc/compression.conf

@@ -0,0 +1,56 @@
+# example config file for --compression-from option
+#
+# Format of non-comment / non-empty lines:
+# <compression-spec>:<path/filename pattern>
+# compression-spec is same format as for --compression option
+# path/filename pattern is same format as for --exclude option
+
+# archives / files:
+none:*.gz
+none:*.tgz
+none:*.bz2
+none:*.tbz2
+none:*.xz
+none:*.txz
+none:*.lzma
+none:*.lzo
+none:*.zip
+none:*.rar
+none:*.7z
+
+# audio:
+none:*.mp3
+none:*.ogg
+none:*.oga
+none:*.flac
+none:*.aac
+none:*.m4a
+
+# video:
+none:*.mp4
+none:*.mkv
+none:*.m4v
+none:*.avi
+none:*.mpg
+none:*.mpeg
+none:*.webm
+none:*.vob
+none:*.ts
+none:*.ogv
+none:*.mov
+none:*.flv
+none:*.ogm
+
+# pictures/images
+none:*.jpg
+none:*.jpeg
+none:*.png
+none:*.gif
+
+# disk images
+none:*.dmg
+
+# software archives
+none:*.rpm
+none:*.deb
+none:*.msi