瀏覽代碼

rcompress: do a repo-wide (re)compression

reads all chunks in on-disk order and recompresses them if they are not already using
the desired compression type and level (and obfuscation level).

supports SIGINT/ctrl-c and --checkpoint-interval (default: 1800s).

this is a borg command that compacts when committing (without this, it would have
a huge space usage). it commits/compacts every checkpoint interval or when
pressing ctrl-c / receiving SIGINT.
Thomas Waldmann 2 年之前
父節點
當前提交
9455c1e278

+ 2 - 0
setup.cfg

@@ -127,6 +127,7 @@ per_file_ignores =
     src/borg/archiver/help_cmd.py:E501,F405
     src/borg/archiver/key_cmds.py:F405
     src/borg/archiver/prune_cmd.py:F405
+    src/borg/archiver/rcompress_cmd.py:F405
     src/borg/archiver/recreate_cmd.py:F405
     src/borg/archiver/rdelete_cmd.py:F405
     src/borg/archiver/rlist_cmd.py:E501
@@ -163,6 +164,7 @@ per_file_ignores =
     src/borg/testsuite/archiver/extract_cmd.py:F405
     src/borg/testsuite/archiver/mount_cmds.py:E501,E722
     src/borg/testsuite/archiver/prune_cmd.py:F405
+    src/borg/testsuite/archiver/rcompress_cmd.py:F405
     src/borg/testsuite/archiver/recreate_cmd.py:F405
     src/borg/testsuite/archiver/return_codes.py:F401,F405,F811
     src/borg/testsuite/benchmark.py:F401,F811

+ 3 - 0
src/borg/archiver/__init__.py

@@ -79,6 +79,7 @@ from .list_cmd import ListMixIn
 from .lock_cmds import LocksMixIn
 from .mount_cmds import MountMixIn
 from .prune_cmd import PruneMixIn
+from .rcompress_cmd import RCompressMixIn
 from .recreate_cmd import RecreateMixIn
 from .rename_cmd import RenameMixIn
 from .rcreate_cmd import RCreateMixIn
@@ -109,6 +110,7 @@ class Archiver(
     PruneMixIn,
     RecreateMixIn,
     RenameMixIn,
+    RCompressMixIn,
     RCreateMixIn,
     RDeleteMixIn,
     RInfoMixIn,
@@ -327,6 +329,7 @@ class Archiver(
         self.build_parser_locks(subparsers, common_parser, mid_common_parser)
         self.build_parser_mount_umount(subparsers, common_parser, mid_common_parser)
         self.build_parser_prune(subparsers, common_parser, mid_common_parser)
+        self.build_parser_rcompress(subparsers, common_parser, mid_common_parser)
         self.build_parser_rcreate(subparsers, common_parser, mid_common_parser)
         self.build_parser_rdelete(subparsers, common_parser, mid_common_parser)
         self.build_parser_rinfo(subparsers, common_parser, mid_common_parser)

+ 246 - 0
src/borg/archiver/rcompress_cmd.py

@@ -0,0 +1,246 @@
+import argparse
+from collections import defaultdict
+
+from ._common import with_repository
+from ..constants import *  # NOQA
+from ..compress import CompressionSpec, ObfuscateSize, Auto, COMPRESSOR_TABLE
+from ..helpers import sig_int, ProgressIndicatorPercent
+
+from ..manifest import Manifest
+
+from ..logger import create_logger
+
+logger = create_logger()
+
+
+def find_chunks(repository, repo_objs, stats, ctype, clevel, olevel):
+    """find chunks that need processing (usually: recompression)."""
+    # to do it this way is maybe not obvious, thus keeping the essential design criteria here:
+    # - determine the chunk ids at one point in time (== do a **full** scan in one go) **before**
+    # writing to the repo (and especially before doing a compaction, which moves segment files around)
+    # - get the chunk ids in **on-disk order** (so we can efficiently compact while processing the chunks)
+    # - only put the ids into the list that actually need recompression (keeps it a little shorter in some cases)
+    recompress_ids = []
+    compr_keys = stats["compr_keys"] = set()
+    compr_wanted = ctype, clevel, olevel
+    state = None
+    chunks_count = len(repository)
+    chunks_limit = min(1000, max(100, chunks_count // 1000))
+    pi = ProgressIndicatorPercent(
+        total=chunks_count,
+        msg="Searching for recompression candidates %3.1f%%",
+        step=0.1,
+        msgid="rcompress.find_chunks",
+    )
+    while True:
+        chunk_ids, state = repository.scan(limit=chunks_limit, state=state)
+        if not chunk_ids:
+            break
+        for id, chunk_no_data in zip(chunk_ids, repository.get_many(chunk_ids, read_data=False)):
+            meta = repo_objs.parse_meta(id, chunk_no_data)
+            compr_found = meta["ctype"], meta["clevel"], meta.get("olevel", -1)
+            if compr_found != compr_wanted:
+                recompress_ids.append(id)
+            compr_keys.add(compr_found)
+            stats[compr_found] += 1
+            stats["checked_count"] += 1
+            pi.show(increase=1)
+    pi.finish()
+    return recompress_ids
+
+
+def process_chunks(repository, repo_objs, stats, recompress_ids, olevel):
+    """process some chunks (usually: recompress)"""
+    compr_keys = stats["compr_keys"]
+    if compr_keys == 0:  # work around defaultdict(int)
+        compr_keys = stats["compr_keys"] = set()
+    for id, chunk in zip(recompress_ids, repository.get_many(recompress_ids, read_data=True)):
+        old_size = len(chunk)
+        stats["old_size"] += old_size
+        meta, data = repo_objs.parse(id, chunk)
+        compr_old = meta["ctype"], meta["clevel"], meta.get("olevel", -1)
+        if olevel == -1:
+            # if the chunk was obfuscated, but should not be in future, remove related metadata
+            meta.pop("olevel", None)
+            meta.pop("psize", None)
+        chunk = repo_objs.format(id, meta, data)
+        compr_done = meta["ctype"], meta["clevel"], meta.get("olevel", -1)
+        if compr_done != compr_old:
+            # we actually changed something
+            repository.put(id, chunk, wait=False)
+            repository.async_response(wait=False)
+            stats["new_size"] += len(chunk)
+            compr_keys.add(compr_done)
+            stats[compr_done] += 1
+            stats["recompressed_count"] += 1
+        else:
+            # It might be that the old chunk used compression none or lz4 (for whatever reason,
+            # including the old compressor being a DecidingCompressor) AND we used a
+            # DecidingCompressor now, which did NOT compress like we wanted, but decided
+            # to use the same compression (and obfuscation) we already had.
+            # In this case, we just keep the old chunk and do not rewrite it -
+            # This is important to avoid rewriting such chunks **again and again**.
+            stats["new_size"] += old_size
+            compr_keys.add(compr_old)
+            stats[compr_old] += 1
+            stats["kept_count"] += 1
+
+
+def format_compression_spec(ctype, clevel, olevel):
+    obfuscation = "" if olevel == -1 else f"obfuscate,{olevel},"
+    for cname, cls in COMPRESSOR_TABLE.items():
+        if cls.ID == ctype:
+            cname = f"{cname}"
+            break
+    else:
+        cname = f"{ctype}"
+    clevel = f",{clevel}" if clevel != 255 else ""
+    return obfuscation + cname + clevel
+
+
+class RCompressMixIn:
+    @with_repository(cache=False, manifest=True, exclusive=True, compatibility=(Manifest.Operation.CHECK,))
+    def do_rcompress(self, args, repository, manifest):
+        """Repository (re-)compression"""
+
+        def get_csettings(c):
+            if isinstance(c, Auto):
+                return get_csettings(c.compressor)
+            if isinstance(c, ObfuscateSize):
+                ctype, clevel, _ = get_csettings(c.compressor)
+                olevel = c.level
+                return ctype, clevel, olevel
+            ctype, clevel, olevel = c.ID, c.level, -1
+            return ctype, clevel, olevel
+
+        repo_objs = manifest.repo_objs
+        ctype, clevel, olevel = get_csettings(repo_objs.compressor)  # desired compression set by --compression
+
+        def checkpoint_func():
+            while repository.async_response(wait=True) is not None:
+                pass
+            repository.commit(compact=True)
+
+        stats_find = defaultdict(int)
+        stats_process = defaultdict(int)
+        recompress_ids = find_chunks(repository, repo_objs, stats_find, ctype, clevel, olevel)
+        recompress_candidate_count = len(recompress_ids)
+        chunks_limit = min(1000, max(100, recompress_candidate_count // 1000))
+        uncommitted_chunks = 0
+
+        # start a new transaction
+        data = repository.get(Manifest.MANIFEST_ID)
+        repository.put(Manifest.MANIFEST_ID, data)
+        uncommitted_chunks += 1
+
+        pi = ProgressIndicatorPercent(
+            total=len(recompress_ids), msg="Recompressing %3.1f%%", step=0.1, msgid="rcompress.process_chunks"
+        )
+        while recompress_ids:
+            if sig_int and sig_int.action_done():
+                break
+            ids, recompress_ids = recompress_ids[:chunks_limit], recompress_ids[chunks_limit:]
+            process_chunks(repository, repo_objs, stats_process, ids, olevel)
+            pi.show(increase=len(ids))
+            checkpointed = self.maybe_checkpoint(
+                checkpoint_func=checkpoint_func, checkpoint_interval=args.checkpoint_interval
+            )
+            uncommitted_chunks = 0 if checkpointed else (uncommitted_chunks + len(ids))
+        pi.finish()
+        if sig_int:
+            # Ctrl-C / SIGINT: do not checkpoint (commit) again, we already have a checkpoint in this case.
+            self.print_error("Got Ctrl-C / SIGINT.")
+        elif uncommitted_chunks > 0:
+            checkpoint_func()
+        if args.stats:
+            print()
+            print("Recompression stats:")
+            print(f"Size: previously {stats_process['old_size']} -> now {stats_process['new_size']} bytes.")
+            print(
+                f"Change: "
+                f"{stats_process['new_size'] - stats_process['old_size']} bytes == "
+                f"{100.0 * stats_process['new_size'] / stats_process['old_size']:3.2f}%"
+            )
+            print("Found chunks stats (before processing):")
+            for ck in stats_find["compr_keys"]:
+                pretty_ck = format_compression_spec(*ck)
+                print(f"{pretty_ck}: {stats_find[ck]}")
+            print(f"Total: {stats_find['checked_count']}")
+
+            print(f"Candidates for recompression: {recompress_candidate_count}")
+
+            print("Processed chunks stats (after processing):")
+            for ck in stats_process["compr_keys"]:
+                pretty_ck = format_compression_spec(*ck)
+                print(f"{pretty_ck}: {stats_process[ck]}")
+            print(f"Recompressed and rewritten: {stats_process['recompressed_count']}")
+            print(f"Kept as is: {stats_process['kept_count']}")
+            print(f"Total: {stats_process['recompressed_count'] + stats_process['kept_count']}")
+
+        return self.exit_code
+
+    def build_parser_rcompress(self, subparsers, common_parser, mid_common_parser):
+        from ._common import process_epilog
+
+        rcompress_epilog = process_epilog(
+            """
+        Repository (re-)compression (and/or re-obfuscation).
+
+        Reads all chunks in the repository (in on-disk order, this is important for
+        compaction) and recompresses them if they are not already using the compression
+        type/level and obfuscation level given via ``--compression``.
+
+        If the outcome of the chunk processing indicates a change in compression
+        type/level or obfuscation level, the processed chunk is written to the repository.
+        Please note that the outcome might not always be the desired compression
+        type/level - if no compression gives a shorter output, that might be chosen.
+
+        Every ``--checkpoint-interval``, progress is committed to the repository and
+        the repository is compacted (this is to keep temporary repo space usage in bounds).
+        A lower checkpoint interval means lower temporary repo space usage, but also
+        slower progress due to higher overhead (and vice versa).
+
+        Please note that this command can not work in low (or zero) free disk space
+        conditions.
+
+        If the ``borg rcompress``process receives a SIGINT signal (Ctrl-C), the repo
+        will be committed and compacted and borg will terminate cleanly afterwards.
+
+        Both ``--progress`` and ``--stats`` are recommended when ``borg rcompress``
+        is used interactively.
+
+        You do **not** need to run ``borg compact`` after ``borg rcompress``.
+        """
+        )
+        subparser = subparsers.add_parser(
+            "rcompress",
+            parents=[common_parser],
+            add_help=False,
+            description=self.do_rcompress.__doc__,
+            epilog=rcompress_epilog,
+            formatter_class=argparse.RawDescriptionHelpFormatter,
+            help=self.do_rcompress.__doc__,
+        )
+        subparser.set_defaults(func=self.do_rcompress)
+
+        subparser.add_argument(
+            "-C",
+            "--compression",
+            metavar="COMPRESSION",
+            dest="compression",
+            type=CompressionSpec,
+            default=CompressionSpec("lz4"),
+            help="select compression algorithm, see the output of the " '"borg help compression" command for details.',
+        )
+
+        subparser.add_argument("-s", "--stats", dest="stats", action="store_true", help="print statistics")
+
+        subparser.add_argument(
+            "-c",
+            "--checkpoint-interval",
+            metavar="SECONDS",
+            dest="checkpoint_interval",
+            type=int,
+            default=1800,
+            help="write checkpoint every SECONDS seconds (Default: 1800)",
+        )

+ 2 - 0
src/borg/compress.pyi

@@ -61,3 +61,5 @@ class ZSTD(DecidingCompressor):
 
 LZ4_COMPRESSOR: Type[LZ4]
 NONE_COMPRESSOR: Type[CNONE]
+
+COMPRESSOR_TABLE: dict

+ 11 - 4
src/borg/compress.pyx

@@ -491,21 +491,29 @@ class Auto(CompressorBase):
         return self._decide(meta, data)[0]
 
     def compress(self, meta, data):
+        def get_meta(from_meta, to_meta):
+            for key in "ctype", "clevel", "csize":
+                if key in from_meta:
+                    to_meta[key] = from_meta[key]
+
         compressor, (cheap_meta, cheap_compressed_data) = self._decide(dict(meta), data)
         if compressor in (LZ4_COMPRESSOR, NONE_COMPRESSOR):
             # we know that trying to compress with expensive compressor is likely pointless,
             # so we fallback to return the cheap compressed data.
-            return cheap_meta, cheap_compressed_data
+            get_meta(cheap_meta, meta)
+            return meta, cheap_compressed_data
         # if we get here, the decider decided to try the expensive compressor.
         # we also know that the compressed data returned by the decider is lz4 compressed.
         expensive_meta, expensive_compressed_data = compressor.compress(dict(meta), data)
         ratio = len(expensive_compressed_data) / len(cheap_compressed_data)
         if ratio < 0.99:
             # the expensive compressor managed to squeeze the data significantly better than lz4.
-            return expensive_meta, expensive_compressed_data
+            get_meta(expensive_meta, meta)
+            return meta, expensive_compressed_data
         else:
             # otherwise let's just store the lz4 data, which decompresses extremely fast.
-            return cheap_meta, cheap_compressed_data
+            get_meta(cheap_meta, meta)
+            return meta, cheap_compressed_data
 
     def decompress(self, data):
         raise NotImplementedError
@@ -558,7 +566,6 @@ class ObfuscateSize(CompressorBase):
 
     def compress(self, meta, data):
         assert not self.legacy_mode  # we never call this in legacy mode
-        meta = dict(meta)  # make a copy, do not modify caller's dict
         meta, compressed_data = self.compressor.compress(meta, data)  # compress data
         compr_size = len(compressed_data)
         assert "csize" in meta, repr(meta)

+ 83 - 0
src/borg/testsuite/archiver/rcompress_cmd.py

@@ -0,0 +1,83 @@
+import os
+from binascii import hexlify
+
+from ...constants import *  # NOQA
+from ...repository import Repository
+from ...manifest import Manifest
+from ...compress import ZSTD, ZLIB, LZ4, CNONE
+
+from . import ArchiverTestCaseBase, RK_ENCRYPTION
+
+
+class ArchiverTestCase(ArchiverTestCaseBase):
+    def test_rcompress(self):
+        def check_compression(ctype, clevel, olevel):
+            """check if all the chunks in the repo are compressed/obfuscated like expected"""
+            repository = Repository(self.repository_path, exclusive=True)
+            with repository:
+                manifest = Manifest.load(repository, Manifest.NO_OPERATION_CHECK)
+                state = None
+                while True:
+                    ids, state = repository.scan(limit=LIST_SCAN_LIMIT, state=state)
+                    if not ids:
+                        break
+                    for id in ids:
+                        chunk = repository.get(id, read_data=True)
+                        meta, data = manifest.repo_objs.parse(id, chunk)  # will also decompress according to metadata
+                        m_olevel = meta.get("olevel", -1)
+                        m_psize = meta.get("psize", -1)
+                        print(
+                            hexlify(id).decode(),
+                            meta["ctype"],
+                            meta["clevel"],
+                            meta["csize"],
+                            meta["size"],
+                            m_olevel,
+                            m_psize,
+                        )
+                        # this is not as easy as one thinks due to the DecidingCompressor choosing the smallest of
+                        # (desired compressed, lz4 compressed, not compressed).
+                        assert meta["ctype"] in (ctype, LZ4.ID, CNONE.ID)
+                        assert meta["clevel"] in (clevel, 255)  # LZ4 and CNONE has level 255
+                        if olevel != -1:  # we expect obfuscation
+                            assert "psize" in meta
+                            assert m_olevel == olevel
+                        else:
+                            assert "psize" not in meta
+                            assert "olevel" not in meta
+
+        self.create_regular_file("file1", size=1024 * 10)
+        self.create_regular_file("file2", contents=os.urandom(1024 * 10))
+        self.cmd(f"--repo={self.repository_location}", "rcreate", RK_ENCRYPTION)
+
+        cname, ctype, clevel, olevel = ZLIB.name, ZLIB.ID, 3, -1
+        self.cmd(f"--repo={self.repository_location}", "create", "test", "input", "-C", f"{cname},{clevel}")
+        check_compression(ctype, clevel, olevel)
+
+        cname, ctype, clevel, olevel = ZSTD.name, ZSTD.ID, 1, -1  # change compressor (and level)
+        self.cmd(f"--repo={self.repository_location}", "rcompress", "-C", f"{cname},{clevel}")
+        check_compression(ctype, clevel, olevel)
+
+        cname, ctype, clevel, olevel = ZSTD.name, ZSTD.ID, 3, -1  # only change level
+        self.cmd(f"--repo={self.repository_location}", "rcompress", "-C", f"{cname},{clevel}")
+        check_compression(ctype, clevel, olevel)
+
+        cname, ctype, clevel, olevel = ZSTD.name, ZSTD.ID, 3, 110  # only change to obfuscated
+        self.cmd(f"--repo={self.repository_location}", "rcompress", "-C", f"obfuscate,{olevel},{cname},{clevel}")
+        check_compression(ctype, clevel, olevel)
+
+        cname, ctype, clevel, olevel = ZSTD.name, ZSTD.ID, 3, 112  # only change obfuscation level
+        self.cmd(f"--repo={self.repository_location}", "rcompress", "-C", f"obfuscate,{olevel},{cname},{clevel}")
+        check_compression(ctype, clevel, olevel)
+
+        cname, ctype, clevel, olevel = ZSTD.name, ZSTD.ID, 3, -1  # change to not obfuscated
+        self.cmd(f"--repo={self.repository_location}", "rcompress", "-C", f"{cname},{clevel}")
+        check_compression(ctype, clevel, olevel)
+
+        cname, ctype, clevel, olevel = ZLIB.name, ZLIB.ID, 1, -1
+        self.cmd(f"--repo={self.repository_location}", "rcompress", "-C", f"auto,{cname},{clevel}")
+        check_compression(ctype, clevel, olevel)
+
+        cname, ctype, clevel, olevel = ZLIB.name, ZLIB.ID, 2, 111
+        self.cmd(f"--repo={self.repository_location}", "rcompress", "-C", f"obfuscate,{olevel},auto,{cname},{clevel}")
+        check_compression(ctype, clevel, olevel)

+ 10 - 11
src/borg/testsuite/compress.py

@@ -206,19 +206,18 @@ def test_obfuscate():
 
 def test_obfuscate_meta():
     compressor = CompressionSpec("obfuscate,3,lz4").compressor
-    meta_in = {}
+    meta = {}
     data = bytes(10000)
-    meta_out, compressed = compressor.compress(meta_in, data)
-    assert "ctype" not in meta_in  # do not modify dict of caller
-    assert "ctype" in meta_out
-    assert meta_out["ctype"] == LZ4.ID
-    assert "clevel" in meta_out
-    assert meta_out["clevel"] == 0xFF
-    assert "csize" in meta_out
-    csize = meta_out["csize"]
+    meta, compressed = compressor.compress(meta, data)
+    assert "ctype" in meta
+    assert meta["ctype"] == LZ4.ID
+    assert "clevel" in meta
+    assert meta["clevel"] == 0xFF
+    assert "csize" in meta
+    csize = meta["csize"]
     assert csize == len(compressed)  # this is the overall size
-    assert "psize" in meta_out
-    psize = meta_out["psize"]
+    assert "psize" in meta
+    psize = meta["psize"]
     assert 0 < psize < 100
     assert csize - psize >= 0  # there is a obfuscation trailer
     trailer = compressed[psize:]