瀏覽代碼

Merge pull request #8543 from ThomasWaldmann/repo-compress-use-flags

repo-compress: reduce memory consumption
TW 6 月之前
父節點
當前提交
c92d528c82
共有 3 個文件被更改,包括 19 次插入13 次删除
  1. 17 13
      src/borg/archiver/repo_compress_cmd.py
  2. 1 0
      src/borg/hashindex.pyi
  3. 1 0
      src/borg/hashindex.pyx

+ 17 - 13
src/borg/archiver/repo_compress_cmd.py

@@ -4,6 +4,7 @@ from collections import defaultdict
 from ._common import with_repository, Highlander
 from ..constants import *  # NOQA
 from ..compress import CompressionSpec, ObfuscateSize, Auto, COMPRESSOR_TABLE
+from ..hashindex import ChunkIndex
 from ..helpers import sig_int, ProgressIndicatorPercent, Error
 from ..repository import Repository
 from ..remote import RemoteRepository
@@ -15,20 +16,22 @@ logger = create_logger()
 
 
 def find_chunks(repository, repo_objs, cache, stats, ctype, clevel, olevel):
-    """find chunks that need processing (usually: recompression)."""
-    recompress_ids = []
+    """find and flag chunks that need processing (usually: recompression)."""
     compr_keys = stats["compr_keys"] = set()
     compr_wanted = ctype, clevel, olevel
-    for id, _ in cache.chunks.iteritems():
+    recompress_count = 0
+    for id, cie in cache.chunks.iteritems():
         chunk_no_data = repository.get(id, read_data=False)
         meta = repo_objs.parse_meta(id, chunk_no_data, ro_type=ROBJ_DONTCARE)
         compr_found = meta["ctype"], meta["clevel"], meta.get("olevel", -1)
         if compr_found != compr_wanted:
-            recompress_ids.append(id)
+            flags_compress = cie.flags | ChunkIndex.F_COMPRESS
+            cache.chunks[id] = cie._replace(flags=flags_compress)
+            recompress_count += 1
         compr_keys.add(compr_found)
         stats[compr_found] += 1
         stats["checked_count"] += 1
-    return recompress_ids
+    return recompress_count
 
 
 def process_chunks(repository, repo_objs, stats, recompress_ids, olevel):
@@ -104,19 +107,20 @@ class RepoCompressMixIn:
 
         stats_find = defaultdict(int)
         stats_process = defaultdict(int)
-        recompress_ids = find_chunks(repository, repo_objs, cache, stats_find, ctype, clevel, olevel)
-        recompress_candidate_count = len(recompress_ids)
-        chunks_limit = min(1000, max(100, recompress_candidate_count // 1000))
+        recompress_candidate_count = find_chunks(repository, repo_objs, cache, stats_find, ctype, clevel, olevel)
 
         pi = ProgressIndicatorPercent(
-            total=len(recompress_ids), msg="Recompressing %3.1f%%", step=0.1, msgid="repo_compress.process_chunks"
+            total=recompress_candidate_count,
+            msg="Recompressing %3.1f%%",
+            step=0.1,
+            msgid="repo_compress.process_chunks",
         )
-        while recompress_ids:
+        for id, cie in cache.chunks.iteritems():
             if sig_int and sig_int.action_done():
                 break
-            ids, recompress_ids = recompress_ids[:chunks_limit], recompress_ids[chunks_limit:]
-            process_chunks(repository, repo_objs, stats_process, ids, olevel)
-            pi.show(increase=len(ids))
+            if cie.flags & ChunkIndex.F_COMPRESS:
+                process_chunks(repository, repo_objs, stats_process, [id], olevel)
+            pi.show()
         pi.finish()
         if sig_int:
             # Ctrl-C / SIGINT: do not commit

+ 1 - 0
src/borg/hashindex.pyi

@@ -13,6 +13,7 @@ CIE = Union[Tuple[int, int], Type[ChunkIndexEntry]]
 class ChunkIndex:
     F_NONE: int
     F_USED: int
+    F_COMPRESS: int
     F_NEW: int
     M_USER: int
     M_SYSTEM: int

+ 1 - 0
src/borg/hashindex.pyx

@@ -47,6 +47,7 @@ class ChunkIndex(HTProxyMixin, MutableMapping):
     M_SYSTEM = 0xff000000  # mask for system flags
     # user flags:
     F_USED = 2 ** 0  # chunk is used/referenced
+    F_COMPRESS = 2 ** 1  # chunk shall get (re-)compressed
     # system flags (internal use, always 0 to user, not changeable by user):
     F_NEW = 2 ** 24  # a new chunk that is not present in repo/cache/chunks.* yet.