ソースを参照

refactor compressors to new api

legacy: add/remove ctype/clevel bytes prefix of compressed data

new: use a separate metadata dict

compressors: use an int as ID, not a len 1 bytestring
Thomas Waldmann 2 年 前
コミット
4c9ed2a6c6

+ 1 - 1
src/borg/archive.py

@@ -2273,7 +2273,7 @@ class ArchiveRecreater:
             compr_hdr = bytes((old_meta["ctype"], old_meta["clevel"]))
             compressor_cls, level = Compressor.detect(compr_hdr)
             if (
-                compressor_cls.name == self.repo_objs.compressor.decide(data).name
+                compressor_cls.name == self.repo_objs.compressor.decide({}, data).name
                 and level == self.repo_objs.compressor.level
             ):
                 # Stored chunk has the same compression method and level as we wanted

+ 150 - 108
src/borg/compress.pyx

@@ -56,22 +56,18 @@ cdef class CompressorBase:
     also handles compression format auto detection and
     adding/stripping the ID header (which enable auto detection).
     """
-    ID = b'\xFF'  # reserved and not used
-                  # overwrite with a unique 1-byte bytestring in child classes
+    ID = 0xFF  # reserved and not used
+               # overwrite with a unique 1-byte bytestring in child classes
     name = 'baseclass'
 
     @classmethod
     def detect(cls, data):
-        return data.startswith(cls.ID)
+        return data and data[0] == cls.ID
 
-    def __init__(self, level=255, **kwargs):
+    def __init__(self, level=255, legacy_mode=False, **kwargs):
         assert 0 <= level <= 255
         self.level = level
-        if self.ID is not None:
-            self.id_level = self.ID + bytes((level, ))  # level 255 means "unknown level"
-            assert len(self.id_level) == 2
-        else:
-            self.id_level = None
+        self.legacy_mode = legacy_mode  # True: support prefixed ctype/clevel bytes
 
     def decide(self, data):
         """
@@ -86,24 +82,48 @@ cdef class CompressorBase:
         """
         return self
 
-    def compress(self, data):
+    def compress(self, meta, data):
         """
-        Compress *data* (bytes) and return bytes result. Prepend the ID bytes of this compressor,
-        which is needed so that the correct decompressor can be used for decompression.
+        Compress *data* (bytes) and return compression metadata and compressed bytes.
         """
-        # add id_level bytes
-        return self.id_level + data
+        if self.legacy_mode:
+            return None, bytes((self.ID, self.level)) + data
+        else:
+            meta["ctype"] = self.ID
+            meta["clevel"] = self.level
+            meta["csize"] = len(data)
+            return meta, data
 
-    def decompress(self, data):
+    def decompress(self, meta, data):
         """
         Decompress *data* (preferably a memoryview, bytes also acceptable) and return bytes result.
-        The leading Compressor ID bytes need to be present.
+
+        Legacy mode: The leading Compressor ID bytes need to be present.
 
         Only handles input generated by _this_ Compressor - for a general purpose
         decompression method see *Compressor.decompress*.
         """
-        # strip id_level bytes
-        return data[2:]
+        if self.legacy_mode:
+            assert meta is None
+            meta = {}
+            meta["ctype"] = data[0]
+            meta["clevel"] = data[1]
+            meta["csize"] = len(data)
+            return meta, data[2:]
+        else:
+            assert isinstance(meta, dict)
+            assert "ctype" in meta
+            assert "clevel" in meta
+            return meta, data
+
+    def check_fix_size(self, meta, data):
+        if "size" in meta:
+            assert meta["size"] == len(data)
+        elif self.legacy_mode:
+            meta["size"] = len(data)
+        else:
+            pass  # raise ValueError("size not present and not in legacy mode")
+
 
 cdef class DecidingCompressor(CompressorBase):
     """
@@ -112,12 +132,12 @@ cdef class DecidingCompressor(CompressorBase):
     """
     name = 'decidebaseclass'
 
-    def __init__(self, level=255, **kwargs):
-        super().__init__(level=level, **kwargs)
+    def __init__(self, level=255, legacy_mode=False, **kwargs):
+        super().__init__(level=level, legacy_mode=legacy_mode, **kwargs)
 
-    def _decide(self, data):
+    def _decide(self, meta, data):
         """
-        Decides what to do with *data*. Returns (compressor, compressed_data).
+        Decides what to do with *data*. Returns (compressor, meta, compressed_data).
 
         *compressed_data* can be the result of *data* being processed by *compressor*,
         if that is generated as a side-effect of the decision process, or None otherwise.
@@ -127,47 +147,50 @@ cdef class DecidingCompressor(CompressorBase):
         """
         raise NotImplementedError
 
-    def decide(self, data):
-        return self._decide(data)[0]
+    def decide(self, meta, data):
+        return self._decide(meta, data)[0]
 
-    def decide_compress(self, data):
+    def decide_compress(self, meta, data):
         """
         Decides what to do with *data* and handle accordingly. Returns (compressor, compressed_data).
 
         *compressed_data* is the result of *data* being processed by *compressor*.
         """
-        compressor, compressed_data = self._decide(data)
+        compressor, (meta, compressed_data) = self._decide(meta, data)
 
         if compressed_data is None:
-            compressed_data = compressor.compress(data)
+            meta, compressed_data = compressor.compress(meta, data)
 
         if compressor is self:
             # call super class to add ID bytes
-            return self, super().compress(compressed_data)
+            return self, super().compress(meta, compressed_data)
 
-        return compressor, compressed_data
+        return compressor, (meta, compressed_data)
 
-    def compress(self, data):
-        return self.decide_compress(data)[1]
+    def compress(self, meta, data):
+        meta["size"] = len(data)
+        return self.decide_compress(meta, data)[1]
 
 class CNONE(CompressorBase):
     """
     none - no compression, just pass through data
     """
-    ID = b'\x00'
+    ID = 0x00
     name = 'none'
 
-    def __init__(self, level=255, **kwargs):
-        super().__init__(level=level, **kwargs)  # no defined levels for CNONE, so just say "unknown"
+    def __init__(self, level=255, legacy_mode=False, **kwargs):
+        super().__init__(level=level, legacy_mode=legacy_mode, **kwargs)  # no defined levels for CNONE, so just say "unknown"
 
-    def compress(self, data):
-        return super().compress(data)
+    def compress(self, meta, data):
+        meta["size"] = len(data)
+        return super().compress(meta, data)
 
-    def decompress(self, data):
-        data = super().decompress(data)
+    def decompress(self, meta, data):
+        meta, data = super().decompress(meta, data)
         if not isinstance(data, bytes):
             data = bytes(data)
-        return data
+        self.check_fix_size(meta, data)
+        return meta, data
 
 
 class LZ4(DecidingCompressor):
@@ -179,13 +202,13 @@ class LZ4(DecidingCompressor):
         - wrapper releases CPython's GIL to support multithreaded code
         - uses safe lz4 methods that never go beyond the end of the output buffer
     """
-    ID = b'\x01'
+    ID = 0x01
     name = 'lz4'
 
-    def __init__(self, level=255, **kwargs):
-        super().__init__(level=level, **kwargs)  # no defined levels for LZ4, so just say "unknown"
+    def __init__(self, level=255, legacy_mode=False, **kwargs):
+        super().__init__(level=level, legacy_mode=legacy_mode, **kwargs)  # no defined levels for LZ4, so just say "unknown"
 
-    def _decide(self, idata):
+    def _decide(self, meta, idata):
         """
         Decides what to do with *data*. Returns (compressor, lz4_data).
 
@@ -206,12 +229,12 @@ class LZ4(DecidingCompressor):
             raise Exception('lz4 compress failed')
         # only compress if the result actually is smaller
         if osize < isize:
-            return self, dest[:osize]
+            return self, (meta, dest[:osize])
         else:
-            return NONE_COMPRESSOR, None
+            return NONE_COMPRESSOR, (meta, None)
 
-    def decompress(self, idata):
-        idata = super().decompress(idata)
+    def decompress(self, meta, data):
+        meta, idata = super().decompress(meta, data)
         if not isinstance(idata, bytes):
             idata = bytes(idata)  # code below does not work with memoryview
         cdef int isize = len(idata)
@@ -237,23 +260,25 @@ class LZ4(DecidingCompressor):
                 raise DecompressionError('lz4 decompress failed')
             # likely the buffer was too small, get a bigger one:
             osize = int(1.5 * osize)
-        return dest[:rsize]
+        data = dest[:rsize]
+        self.check_fix_size(meta, data)
+        return meta, data
 
 
 class LZMA(DecidingCompressor):
     """
     lzma compression / decompression
     """
-    ID = b'\x02'
+    ID = 0x02
     name = 'lzma'
 
-    def __init__(self, level=6, **kwargs):
-        super().__init__(level=level, **kwargs)
+    def __init__(self, level=6, legacy_mode=False, **kwargs):
+        super().__init__(level=level, legacy_mode=legacy_mode, **kwargs)
         self.level = level
         if lzma is None:
             raise ValueError('No lzma support found.')
 
-    def _decide(self, data):
+    def _decide(self, meta, data):
         """
         Decides what to do with *data*. Returns (compressor, lzma_data).
 
@@ -262,14 +287,16 @@ class LZMA(DecidingCompressor):
         # we do not need integrity checks in lzma, we do that already
         lzma_data = lzma.compress(data, preset=self.level, check=lzma.CHECK_NONE)
         if len(lzma_data) < len(data):
-            return self, lzma_data
+            return self, (meta, lzma_data)
         else:
-            return NONE_COMPRESSOR, None
+            return NONE_COMPRESSOR, (meta, None)
 
-    def decompress(self, data):
-        data = super().decompress(data)
+    def decompress(self, meta, data):
+        meta, data = super().decompress(meta, data)
         try:
-            return lzma.decompress(data)
+            data = lzma.decompress(data)
+            self.check_fix_size(meta, data)
+            return meta, data
         except lzma.LZMAError as e:
             raise DecompressionError(str(e)) from None
 
@@ -279,14 +306,14 @@ class ZSTD(DecidingCompressor):
     # This is a NOT THREAD SAFE implementation.
     # Only ONE python context must be created at a time.
     # It should work flawlessly as long as borg will call ONLY ONE compression job at time.
-    ID = b'\x03'
+    ID = 0x03
     name = 'zstd'
 
-    def __init__(self, level=3, **kwargs):
-        super().__init__(level=level, **kwargs)
+    def __init__(self, level=3, legacy_mode=False, **kwargs):
+        super().__init__(level=level, legacy_mode=legacy_mode, **kwargs)
         self.level = level
 
-    def _decide(self, idata):
+    def _decide(self, meta, idata):
         """
         Decides what to do with *data*. Returns (compressor, zstd_data).
 
@@ -308,12 +335,12 @@ class ZSTD(DecidingCompressor):
             raise Exception('zstd compress failed: %s' % ZSTD_getErrorName(osize))
         # only compress if the result actually is smaller
         if osize < isize:
-            return self, dest[:osize]
+            return self, (meta, dest[:osize])
         else:
-            return NONE_COMPRESSOR, None
+            return NONE_COMPRESSOR, (meta, None)
 
-    def decompress(self, idata):
-        idata = super().decompress(idata)
+    def decompress(self, meta, data):
+        meta, idata = super().decompress(meta, data)
         if not isinstance(idata, bytes):
             idata = bytes(idata)  # code below does not work with memoryview
         cdef int isize = len(idata)
@@ -337,21 +364,23 @@ class ZSTD(DecidingCompressor):
             raise DecompressionError('zstd decompress failed: %s' % ZSTD_getErrorName(rsize))
         if rsize != osize:
             raise DecompressionError('zstd decompress failed: size mismatch')
-        return dest[:osize]
+        data = dest[:osize]
+        self.check_fix_size(meta, data)
+        return meta, data
 
 
 class ZLIB(DecidingCompressor):
     """
     zlib compression / decompression (python stdlib)
     """
-    ID = b'\x05'
+    ID = 0x05
     name = 'zlib'
 
-    def __init__(self, level=6, **kwargs):
-        super().__init__(level=level, **kwargs)
+    def __init__(self, level=6, legacy_mode=False, **kwargs):
+        super().__init__(level=level, legacy_mode=legacy_mode, **kwargs)
         self.level = level
 
-    def _decide(self, data):
+    def _decide(self, meta, data):
         """
         Decides what to do with *data*. Returns (compressor, zlib_data).
 
@@ -359,14 +388,16 @@ class ZLIB(DecidingCompressor):
         """
         zlib_data = zlib.compress(data, self.level)
         if len(zlib_data) < len(data):
-            return self, zlib_data
+            return self, (meta, zlib_data)
         else:
-            return NONE_COMPRESSOR, None
+            return NONE_COMPRESSOR, (meta, None)
 
-    def decompress(self, data):
-        data = super().decompress(data)
+    def decompress(self, meta, data):
+        meta, data = super().decompress(meta, data)
         try:
-            return zlib.decompress(data)
+            data = zlib.decompress(data)
+            self.check_fix_size(meta, data)
+            return meta, data
         except zlib.error as e:
             raise DecompressionError(str(e)) from None
 
@@ -382,7 +413,7 @@ class ZLIB_legacy(CompressorBase):
           Newer borg uses the ZLIB class that has separate ID bytes (as all the other
           compressors) and does not need this hack.
     """
-    ID = b'\x08'  # not used here, see detect()
+    ID = 0x08  # not used here, see detect()
     # avoid all 0x.8 IDs elsewhere!
     name = 'zlib_legacy'
 
@@ -398,14 +429,14 @@ class ZLIB_legacy(CompressorBase):
         super().__init__(level=level, **kwargs)
         self.level = level
 
-    def compress(self, data):
+    def compress(self, meta, data):
         # note: for compatibility no super call, do not add ID bytes
-        return zlib.compress(data, self.level)
+        return None, zlib.compress(data, self.level)
 
-    def decompress(self, data):
+    def decompress(self, meta, data):
         # note: for compatibility no super call, do not strip ID bytes
         try:
-            return zlib.decompress(data)
+            return meta, zlib.decompress(data)
         except zlib.error as e:
             raise DecompressionError(str(e)) from None
 
@@ -425,7 +456,7 @@ class Auto(CompressorBase):
         super().__init__()
         self.compressor = compressor
 
-    def _decide(self, data):
+    def _decide(self, meta, data):
         """
         Decides what to do with *data*. Returns (compressor, compressed_data).
 
@@ -448,33 +479,33 @@ class Auto(CompressorBase):
         Note: While it makes no sense, the expensive compressor may well be set
         to the LZ4 compressor.
         """
-        compressor, compressed_data = LZ4_COMPRESSOR.decide_compress(data)
+        compressor, (meta, compressed_data) = LZ4_COMPRESSOR.decide_compress(meta, data)
         # compressed_data includes the compression type header, while data does not yet
         ratio = len(compressed_data) / (len(data) + 2)
         if ratio < 0.97:
-            return self.compressor, compressed_data
+            return self.compressor, (meta, compressed_data)
         else:
-            return compressor, compressed_data
+            return compressor, (meta, compressed_data)
 
-    def decide(self, data):
-        return self._decide(data)[0]
+    def decide(self, meta, data):
+        return self._decide(meta, data)[0]
 
-    def compress(self, data):
-        compressor, cheap_compressed_data = self._decide(data)
+    def compress(self, meta, data):
+        compressor, (cheap_meta, cheap_compressed_data) = self._decide(dict(meta), data)
         if compressor in (LZ4_COMPRESSOR, NONE_COMPRESSOR):
             # we know that trying to compress with expensive compressor is likely pointless,
             # so we fallback to return the cheap compressed data.
-            return cheap_compressed_data
+            return cheap_meta, cheap_compressed_data
         # if we get here, the decider decided to try the expensive compressor.
         # we also know that the compressed data returned by the decider is lz4 compressed.
-        expensive_compressed_data = compressor.compress(data)
+        expensive_meta, expensive_compressed_data = compressor.compress(dict(meta), data)
         ratio = len(expensive_compressed_data) / len(cheap_compressed_data)
         if ratio < 0.99:
             # the expensive compressor managed to squeeze the data significantly better than lz4.
-            return expensive_compressed_data
+            return expensive_meta, expensive_compressed_data
         else:
             # otherwise let's just store the lz4 data, which decompresses extremely fast.
-            return cheap_compressed_data
+            return cheap_meta, cheap_compressed_data
 
     def decompress(self, data):
         raise NotImplementedError
@@ -487,14 +518,14 @@ class ObfuscateSize(CompressorBase):
     """
     Meta-Compressor that obfuscates the compressed data size.
     """
-    ID = b'\x04'
+    ID = 0x04
     name = 'obfuscate'
 
     header_fmt = Struct('<I')
     header_len = len(header_fmt.pack(0))
 
-    def __init__(self, level=None, compressor=None):
-        super().__init__(level=level)  # data will be encrypted, so we can tell the level
+    def __init__(self, level=None, compressor=None, legacy_mode=False):
+        super().__init__(level=level, legacy_mode=legacy_mode)  # data will be encrypted, so we can tell the level
         self.compressor = compressor
         if level is None:
             pass  # decompression
@@ -524,25 +555,30 @@ class ObfuscateSize(CompressorBase):
     def _random_padding_obfuscate(self, compr_size):
         return int(self.max_padding_size * random.random())
 
-    def compress(self, data):
-        compressed_data = self.compressor.compress(data)  # compress data
+    def compress(self, meta, data):
+        assert not self.legacy_mode  # we never call this in legacy mode
+        meta = dict(meta)  # make a copy, do not modify caller's dict
+        meta, compressed_data = self.compressor.compress(meta, data)  # compress data
         compr_size = len(compressed_data)
-        header = self.header_fmt.pack(compr_size)
+        assert "csize" in meta, repr(meta)
+        meta["psize"] = meta["csize"]  # psize (payload size) is the csize (compressed size) of the inner compressor
         addtl_size = self._obfuscate(compr_size)
         addtl_size = max(0, addtl_size)  # we can only make it longer, not shorter!
         addtl_size = min(MAX_DATA_SIZE - 1024 - compr_size, addtl_size)  # stay away from MAX_DATA_SIZE
         trailer = bytes(addtl_size)
-        obfuscated_data = b''.join([header, compressed_data, trailer])
-        return super().compress(obfuscated_data)  # add ID header
+        obfuscated_data = compressed_data + trailer
+        meta["csize"] = len(obfuscated_data)  # csize is the overall output size of this "obfuscation compressor"
+        return meta, obfuscated_data  # for borg2 it is enough that we have the payload size in meta["psize"]
 
-    def decompress(self, data):
-        obfuscated_data = super().decompress(data)  # remove obfuscator ID header
+    def decompress(self, meta, data):
+        assert self.legacy_mode  # borg2 never dispatches to this, only used for legacy mode
+        meta, obfuscated_data = super().decompress(meta, data)  # remove obfuscator ID header
         compr_size = self.header_fmt.unpack(obfuscated_data[0:self.header_len])[0]
         compressed_data = obfuscated_data[self.header_len:self.header_len+compr_size]
         if self.compressor is None:
             compressor_cls = Compressor.detect(compressed_data)[0]
             self.compressor = compressor_cls()
-        return self.compressor.decompress(compressed_data)  # decompress data
+        return self.compressor.decompress(meta, compressed_data)  # decompress data
 
 
 # Maps valid compressor names to their class
@@ -576,12 +612,18 @@ class Compressor:
         self.params = kwargs
         self.compressor = get_compressor(name, **self.params)
 
-    def compress(self, data):
-        return self.compressor.compress(data)
+    def compress(self, meta, data):
+        return self.compressor.compress(meta, data)
 
-    def decompress(self, data):
-        compressor_cls = self.detect(data)[0]
-        return compressor_cls(**self.params).decompress(data)
+    def decompress(self, meta, data):
+        if self.compressor.legacy_mode:
+            hdr = data[:2]
+        else:
+            ctype = meta["ctype"]
+            clevel = meta["clevel"]
+            hdr = bytes((ctype, clevel))
+        compressor_cls = self.detect(hdr)[0]
+        return compressor_cls(**self.params).decompress(meta, data)
 
     @staticmethod
     def detect(data):

+ 8 - 6
src/borg/remote.py

@@ -1293,22 +1293,24 @@ def cache_if_remote(repository, *, decrypted_cache=False, pack=None, unpack=None
         raise ValueError("decrypted_cache and pack/unpack/transform are incompatible")
     elif decrypted_cache:
         repo_objs = decrypted_cache
-        # 32 bit csize, 64 bit (8 byte) xxh64
-        cache_struct = struct.Struct("=I8s")
+        # 32 bit csize, 64 bit (8 byte) xxh64, 1 byte ctype, 1 byte clevel
+        cache_struct = struct.Struct("=I8sBB")
         compressor = Compressor("lz4")
 
         def pack(data):
             csize, decrypted = data
-            compressed = compressor.compress(decrypted)
-            return cache_struct.pack(csize, xxh64(compressed)) + compressed
+            meta, compressed = compressor.compress({}, decrypted)
+            return cache_struct.pack(csize, xxh64(compressed), meta["ctype"], meta["clevel"]) + compressed
 
         def unpack(data):
             data = memoryview(data)
-            csize, checksum = cache_struct.unpack(data[: cache_struct.size])
+            csize, checksum, ctype, clevel = cache_struct.unpack(data[: cache_struct.size])
             compressed = data[cache_struct.size :]
             if checksum != xxh64(compressed):
                 raise IntegrityError("detected corrupted data in metadata cache")
-            return csize, compressor.decompress(compressed)
+            meta = dict(ctype=ctype, clevel=clevel, csize=len(compressed))
+            _, decrypted = compressor.decompress(meta, compressed)
+            return csize, decrypted
 
         def transform(id_, data):
             meta, decrypted = repo_objs.parse(id_, data)

+ 19 - 23
src/borg/repoobj.py

@@ -1,7 +1,7 @@
 from struct import Struct
 
 from .helpers import msgpack
-from .compress import Compressor, LZ4_COMPRESSOR
+from .compress import Compressor, LZ4_COMPRESSOR, get_compressor
 
 
 class RepoObj:
@@ -40,20 +40,16 @@ class RepoObj:
         assert compress or size is not None and ctype is not None and clevel is not None
         if compress:
             assert size is None or size == len(data)
-            size = len(data) if size is None else size
-            data_compressed = self.compressor.compress(data)  # TODO: compressor also adds compressor type/level bytes
-            ctype = data_compressed[0]
-            clevel = data_compressed[1]
-            data_compressed = data_compressed[2:]  # strip the type/level bytes
+            meta, data_compressed = self.compressor.compress(meta, data)
         else:
             assert isinstance(size, int)
+            meta["size"] = size
             assert isinstance(ctype, int)
+            meta["ctype"] = ctype
             assert isinstance(clevel, int)
+            meta["clevel"] = clevel
             data_compressed = data  # is already compressed, is NOT prefixed by type/level bytes
-        meta["size"] = size
-        meta["csize"] = len(data_compressed)
-        meta["ctype"] = ctype
-        meta["clevel"] = clevel
+            meta["csize"] = len(data_compressed)
         data_encrypted = self.key.encrypt(id, data_compressed)
         meta_packed = msgpack.packb(meta)
         meta_encrypted = self.key.encrypt(id, meta_packed)
@@ -92,13 +88,14 @@ class RepoObj:
         if decompress:
             ctype = meta["ctype"]
             clevel = meta["clevel"]
-            csize = meta["csize"]  # for obfuscation purposes, data_compressed may be longer than csize
+            csize = meta["csize"]  # always the overall size
+            assert csize == len(data_compressed)
+            psize = meta.get("psize", csize)  # obfuscation: psize (payload size) is potentially less than csize.
+            assert psize <= csize
             compr_hdr = bytes((ctype, clevel))
             compressor_cls, compression_level = Compressor.detect(compr_hdr)
             compressor = compressor_cls(level=compression_level)
-            data = compressor.decompress(
-                compr_hdr + data_compressed[:csize]
-            )  # TODO: decompressor still needs type/level bytes
+            meta, data = compressor.decompress(meta, data_compressed[:psize])
             self.key.assert_id(id, data)
         else:
             data = data_compressed  # does not include the type/level bytes
@@ -113,7 +110,7 @@ class RepoObj1:  # legacy
 
     def __init__(self, key):
         self.key = key
-        self.compressor = LZ4_COMPRESSOR
+        self.compressor = get_compressor("lz4", legacy_mode=True)
 
     def id_hash(self, data: bytes) -> bytes:
         return self.key.id_hash(data)
@@ -126,7 +123,7 @@ class RepoObj1:  # legacy
         assert compress or size is not None
         if compress:
             assert size is None
-            data_compressed = self.compressor.compress(data)  # TODO: compressor also adds compressor type/level bytes
+            meta, data_compressed = self.compressor.compress(meta, data)
         else:
             assert isinstance(size, int)
             data_compressed = data  # is already compressed, must include type/level bytes
@@ -136,17 +133,16 @@ class RepoObj1:  # legacy
     def parse(self, id: bytes, cdata: bytes, decompress: bool = True) -> tuple[dict, bytes]:
         assert isinstance(id, bytes)
         assert isinstance(cdata, bytes)
-        meta = {}
         data_compressed = self.key.decrypt(id, cdata)
-        meta["csize"] = len(data_compressed)
         compressor_cls, compression_level = Compressor.detect(data_compressed[:2])
-        compressor = compressor_cls(level=compression_level)
-        meta["ctype"] = compressor.ID[0]
-        meta["clevel"] = compressor.level
+        compressor = compressor_cls(level=compression_level, legacy_mode=True)
         if decompress:
-            data = compressor.decompress(data_compressed)  # TODO: decompressor still needs type/level bytes
+            meta, data = compressor.decompress(None, data_compressed)
             self.key.assert_id(id, data)
-            meta["size"] = len(data)
         else:
+            meta = {}
+            meta["ctype"] = compressor.ID
+            meta["clevel"] = compressor.level
             data = data_compressed
+        meta["csize"] = len(data_compressed)
         return meta, data

+ 84 - 57
src/borg/testsuite/compress.py

@@ -29,19 +29,19 @@ def test_get_compressor():
 
 def test_cnull():
     c = get_compressor(name="none")
-    cdata = c.compress(data)
-    assert len(cdata) > len(data)
+    meta, cdata = c.compress({}, data)
+    assert len(cdata) >= len(data)
     assert data in cdata  # it's not compressed and just in there 1:1
-    assert data == c.decompress(cdata)
-    assert data == Compressor(**params).decompress(cdata)  # autodetect
+    assert data == c.decompress(meta, cdata)[1]
+    assert data == Compressor(**params).decompress(meta, cdata)[1]  # autodetect
 
 
 def test_lz4():
     c = get_compressor(name="lz4")
-    cdata = c.compress(data)
+    meta, cdata = c.compress({}, data)
     assert len(cdata) < len(data)
-    assert data == c.decompress(cdata)
-    assert data == Compressor(**params).decompress(cdata)  # autodetect
+    assert data == c.decompress(meta, cdata)[1]
+    assert data == Compressor(**params).decompress(meta, cdata)[1]  # autodetect
 
 
 def test_lz4_buffer_allocation(monkeypatch):
@@ -51,56 +51,56 @@ def test_lz4_buffer_allocation(monkeypatch):
     data = os.urandom(5 * 2**20) * 10  # 50MiB badly compressible data
     assert len(data) == 50 * 2**20
     c = Compressor("lz4")
-    cdata = c.compress(data)
-    assert len(cdata) > len(data)
-    assert data == c.decompress(cdata)
+    meta, cdata = c.compress({}, data)
+    assert len(cdata) >= len(data)
+    assert data == c.decompress(meta, cdata)[1]
 
 
 def test_zlib():
     c = get_compressor(name="zlib")
-    cdata = c.compress(data)
+    meta, cdata = c.compress({}, data)
     assert len(cdata) < len(data)
-    assert data == c.decompress(cdata)
-    assert data == Compressor(**params).decompress(cdata)  # autodetect
+    assert data == c.decompress(meta, cdata)[1]
+    assert data == Compressor(**params).decompress(meta, cdata)[1]  # autodetect
 
 
 def test_lzma():
     if lzma is None:
         pytest.skip("No lzma support found.")
     c = get_compressor(name="lzma")
-    cdata = c.compress(data)
+    meta, cdata = c.compress({}, data)
     assert len(cdata) < len(data)
-    assert data == c.decompress(cdata)
-    assert data == Compressor(**params).decompress(cdata)  # autodetect
+    assert data == c.decompress(meta, cdata)[1]
+    assert data == Compressor(**params).decompress(meta, cdata)[1]  # autodetect
 
 
 def test_zstd():
     c = get_compressor(name="zstd")
-    cdata = c.compress(data)
+    meta, cdata = c.compress({}, data)
     assert len(cdata) < len(data)
-    assert data == c.decompress(cdata)
-    assert data == Compressor(**params).decompress(cdata)  # autodetect
+    assert data == c.decompress(meta, cdata)[1]
+    assert data == Compressor(**params).decompress(meta, cdata)[1]  # autodetect
 
 
 def test_autodetect_invalid():
     with pytest.raises(ValueError):
-        Compressor(**params).decompress(b"\xff\xfftotalcrap")
+        Compressor(**params, legacy_mode=True).decompress({}, b"\xff\xfftotalcrap")
     with pytest.raises(ValueError):
-        Compressor(**params).decompress(b"\x08\x00notreallyzlib")
+        Compressor(**params, legacy_mode=True).decompress({}, b"\x08\x00notreallyzlib")
 
 
 def test_zlib_legacy_compat():
     # for compatibility reasons, we do not add an extra header for zlib,
     # nor do we expect one when decompressing / autodetecting
     for level in range(10):
-        c = get_compressor(name="zlib_legacy", level=level)
-        cdata1 = c.compress(data)
+        c = get_compressor(name="zlib_legacy", level=level, legacy_mode=True)
+        meta1, cdata1 = c.compress({}, data)
         cdata2 = zlib.compress(data, level)
         assert cdata1 == cdata2
-        data2 = c.decompress(cdata2)
-        assert data == data2
-        data2 = Compressor(**params).decompress(cdata2)
+        meta2, data2 = c.decompress({}, cdata2)
         assert data == data2
+        # _, data2 = Compressor(**params).decompress({}, cdata2)
+        # assert data == data2
 
 
 def test_compressor():
@@ -122,7 +122,17 @@ def test_compressor():
         ]
     for params in params_list:
         c = Compressor(**params)
-        assert data == c.decompress(c.compress(data))
+        meta_c, data_compressed = c.compress({}, data)
+        assert "ctype" in meta_c
+        assert "clevel" in meta_c
+        assert meta_c["csize"] == len(data_compressed)
+        assert meta_c["size"] == len(data)
+        meta_d, data_decompressed = c.decompress(meta_c, data_compressed)
+        assert data == data_decompressed
+        assert "ctype" in meta_d
+        assert "clevel" in meta_d
+        assert meta_d["csize"] == len(data_compressed)
+        assert meta_d["size"] == len(data)
 
 
 def test_auto():
@@ -130,72 +140,89 @@ def test_auto():
     compressor_lz4 = CompressionSpec("lz4").compressor
     compressor_zlib = CompressionSpec("zlib,9").compressor
     data = bytes(500)
-    compressed_auto_zlib = compressor_auto_zlib.compress(data)
-    compressed_lz4 = compressor_lz4.compress(data)
-    compressed_zlib = compressor_zlib.compress(data)
+    meta, compressed_auto_zlib = compressor_auto_zlib.compress({}, data)
+    _, compressed_lz4 = compressor_lz4.compress({}, data)
+    _, compressed_zlib = compressor_zlib.compress({}, data)
     ratio = len(compressed_zlib) / len(compressed_lz4)
-    assert Compressor.detect(compressed_auto_zlib)[0] == ZLIB if ratio < 0.99 else LZ4
+    assert meta["ctype"] == ZLIB.ID if ratio < 0.99 else LZ4.ID
+    assert meta["clevel"] == 9 if ratio < 0.99 else 255
+    assert meta["csize"] == len(compressed_auto_zlib)
 
     data = b"\x00\xb8\xa3\xa2-O\xe1i\xb6\x12\x03\xc21\xf3\x8a\xf78\\\x01\xa5b\x07\x95\xbeE\xf8\xa3\x9ahm\xb1~"
-    compressed = compressor_auto_zlib.compress(data)
-    assert Compressor.detect(compressed)[0] == CNONE
+    meta, compressed = compressor_auto_zlib.compress(dict(meta), data)
+    assert meta["ctype"] == CNONE.ID
+    assert meta["clevel"] == 255
+    assert meta["csize"] == len(compressed)
 
 
 def test_obfuscate():
     compressor = CompressionSpec("obfuscate,1,none").compressor
     data = bytes(10000)
-    compressed = compressor.compress(data)
-    # 2 id bytes compression, 2 id bytes obfuscator. 4 length bytes
-    assert len(data) + 8 <= len(compressed) <= len(data) * 101 + 8
+    _, compressed = compressor.compress({}, data)
+    assert len(data) <= len(compressed) <= len(data) * 101
     # compressing 100 times the same data should give at least 50 different result sizes
-    assert len({len(compressor.compress(data)) for i in range(100)}) > 50
+    assert len({len(compressor.compress({}, data)[1]) for i in range(100)}) > 50
 
     cs = CompressionSpec("obfuscate,2,lz4")
     assert isinstance(cs.inner.compressor, LZ4)
     compressor = cs.compressor
     data = bytes(10000)
-    compressed = compressor.compress(data)
-    # 2 id bytes compression, 2 id bytes obfuscator. 4 length bytes
+    _, compressed = compressor.compress({}, data)
     min_compress, max_compress = 0.2, 0.001  # estimate compression factor outer boundaries
-    assert max_compress * len(data) + 8 <= len(compressed) <= min_compress * len(data) * 1001 + 8
+    assert max_compress * len(data) <= len(compressed) <= min_compress * len(data) * 1001
     # compressing 100 times the same data should give multiple different result sizes
-    assert len({len(compressor.compress(data)) for i in range(100)}) > 10
+    assert len({len(compressor.compress({}, data)[1]) for i in range(100)}) > 10
 
     cs = CompressionSpec("obfuscate,6,zstd,3")
     assert isinstance(cs.inner.compressor, ZSTD)
     compressor = cs.compressor
     data = bytes(10000)
-    compressed = compressor.compress(data)
-    # 2 id bytes compression, 2 id bytes obfuscator. 4 length bytes
+    _, compressed = compressor.compress({}, data)
     min_compress, max_compress = 0.2, 0.001  # estimate compression factor outer boundaries
-    assert max_compress * len(data) + 8 <= len(compressed) <= min_compress * len(data) * 10000001 + 8
+    assert max_compress * len(data) <= len(compressed) <= min_compress * len(data) * 10000001
     # compressing 100 times the same data should give multiple different result sizes
-    assert len({len(compressor.compress(data)) for i in range(100)}) > 90
+    assert len({len(compressor.compress({}, data)[1]) for i in range(100)}) > 90
 
     cs = CompressionSpec("obfuscate,2,auto,zstd,10")
     assert isinstance(cs.inner.compressor, Auto)
     compressor = cs.compressor
     data = bytes(10000)
-    compressed = compressor.compress(data)
-    # 2 id bytes compression, 2 id bytes obfuscator. 4 length bytes
+    _, compressed = compressor.compress({}, data)
     min_compress, max_compress = 0.2, 0.001  # estimate compression factor outer boundaries
-    assert max_compress * len(data) + 8 <= len(compressed) <= min_compress * len(data) * 1001 + 8
+    assert max_compress * len(data) <= len(compressed) <= min_compress * len(data) * 1001
     # compressing 100 times the same data should give multiple different result sizes
-    assert len({len(compressor.compress(data)) for i in range(100)}) > 10
+    assert len({len(compressor.compress({}, data)[1]) for i in range(100)}) > 10
 
     cs = CompressionSpec("obfuscate,110,none")
     assert isinstance(cs.inner.compressor, CNONE)
     compressor = cs.compressor
     data = bytes(1000)
-    compressed = compressor.compress(data)
-    # N blocks + 2 id bytes obfuscator. 4 length bytes
-    # The 'none' compressor also adds 2 id bytes
-    assert 6 + 2 + 1000 <= len(compressed) <= 6 + 2 + 1000 + 1024
+    _, compressed = compressor.compress({}, data)
+    assert 1000 <= len(compressed) <= 1000 + 1024
     data = bytes(1100)
-    compressed = compressor.compress(data)
-    # N blocks + 2 id bytes obfuscator. 4 length bytes
-    # The 'none' compressor also adds 2 id bytes
-    assert 6 + 2 + 1100 <= len(compressed) <= 6 + 2 + 1100 + 1024
+    _, compressed = compressor.compress({}, data)
+    assert 1100 <= len(compressed) <= 1100 + 1024
+
+
+def test_obfuscate_meta():
+    compressor = CompressionSpec("obfuscate,3,lz4").compressor
+    meta_in = {}
+    data = bytes(10000)
+    meta_out, compressed = compressor.compress(meta_in, data)
+    assert "ctype" not in meta_in  # do not modify dict of caller
+    assert "ctype" in meta_out
+    assert meta_out["ctype"] == LZ4.ID
+    assert "clevel" in meta_out
+    assert meta_out["clevel"] == 0xFF
+    assert "csize" in meta_out
+    csize = meta_out["csize"]
+    assert csize == len(compressed)  # this is the overall size
+    assert "psize" in meta_out
+    psize = meta_out["psize"]
+    assert 0 < psize < 100
+    assert csize - psize >= 0  # there is a obfuscation trailer
+    trailer = compressed[psize:]
+    assert not trailer or set(trailer) == {0}  # trailer is all-zero-bytes
 
 
 def test_compression_specs():

+ 4 - 4
src/borg/testsuite/repoobj.py

@@ -56,7 +56,7 @@ def test_format_parse_roundtrip_borg1(key):  # legacy
     edata = repo_objs.extract_crypted_data(cdata)
     compressor = repo_objs.compressor
     key = repo_objs.key
-    assert edata.startswith(bytes((key.TYPE, compressor.ID[0], compressor.level)))
+    assert edata.startswith(bytes((key.TYPE, compressor.ID, compressor.level)))
 
 
 def test_borg1_borg2_transition(key):
@@ -70,7 +70,7 @@ def test_borg1_borg2_transition(key):
     borg1_cdata = repo_objs1.format(id, meta, data)
     meta1, compr_data1 = repo_objs1.parse(id, borg1_cdata, decompress=False)  # borg transfer avoids (de)compression
     # in borg 1, we can only get this metadata after decrypting the whole chunk (and we do not have "size" here):
-    assert meta1["ctype"] == LZ4.ID[0]  # default compression
+    assert meta1["ctype"] == LZ4.ID  # default compression
     assert meta1["clevel"] == 0xFF  # lz4 does not know levels (yet?)
     assert meta1["csize"] < len_data  # lz4 should make it smaller
 
@@ -82,14 +82,14 @@ def test_borg1_borg2_transition(key):
     )
     meta2, data2 = repo_objs2.parse(id, borg2_cdata)
     assert data2 == data
-    assert meta2["ctype"] == LZ4.ID[0]
+    assert meta2["ctype"] == LZ4.ID
     assert meta2["clevel"] == 0xFF
     assert meta2["csize"] == meta1["csize"] - 2  # borg2 does not store the type/level bytes there
     assert meta2["size"] == len_data
 
     meta2 = repo_objs2.parse_meta(id, borg2_cdata)
     # now, in borg 2, we have nice and separately decrypted metadata (no need to decrypt the whole chunk):
-    assert meta2["ctype"] == LZ4.ID[0]
+    assert meta2["ctype"] == LZ4.ID
     assert meta2["clevel"] == 0xFF
     assert meta2["csize"] == meta1["csize"] - 2  # borg2 does not store the type/level bytes there
     assert meta2["size"] == len_data

+ 1 - 1
src/borg/upgrade.py

@@ -102,7 +102,7 @@ class UpgraderFrom12To20:
         # meta/data was parsed via RepoObj1.parse, which returns data **including** the ctype/clevel bytes prefixed
         def upgrade_zlib_and_level(meta, data):
             if ZLIB_legacy.detect(data):
-                ctype = ZLIB.ID[0]
+                ctype = ZLIB.ID
                 data = bytes(data)  # ZLIB_legacy has no ctype/clevel prefix
             else:
                 ctype = data[0]