repo_compress_cmd.py 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205
  1. import argparse
  2. from collections import defaultdict
  3. from ._common import with_repository, Highlander
  4. from ..constants import * # NOQA
  5. from ..compress import CompressionSpec, ObfuscateSize, Auto, COMPRESSOR_TABLE
  6. from ..hashindex import ChunkIndex
  7. from ..helpers import sig_int, ProgressIndicatorPercent, Error
  8. from ..repository import Repository
  9. from ..remote import RemoteRepository
  10. from ..manifest import Manifest
  11. from ..logger import create_logger
  12. logger = create_logger()
  13. def find_chunks(repository, repo_objs, cache, stats, ctype, clevel, olevel):
  14. """find and flag chunks that need processing (usually: recompression)."""
  15. compr_keys = stats["compr_keys"] = set()
  16. compr_wanted = ctype, clevel, olevel
  17. recompress_count = 0
  18. for id, cie in cache.chunks.iteritems():
  19. chunk_no_data = repository.get(id, read_data=False)
  20. meta = repo_objs.parse_meta(id, chunk_no_data, ro_type=ROBJ_DONTCARE)
  21. compr_found = meta["ctype"], meta["clevel"], meta.get("olevel", -1)
  22. if compr_found != compr_wanted:
  23. flags_compress = cie.flags | ChunkIndex.F_COMPRESS
  24. cache.chunks[id] = cie._replace(flags=flags_compress)
  25. recompress_count += 1
  26. compr_keys.add(compr_found)
  27. stats[compr_found] += 1
  28. stats["checked_count"] += 1
  29. return recompress_count
  30. def process_chunks(repository, repo_objs, stats, recompress_ids, olevel):
  31. """process some chunks (usually: recompress)"""
  32. compr_keys = stats["compr_keys"]
  33. if compr_keys == 0: # work around defaultdict(int)
  34. compr_keys = stats["compr_keys"] = set()
  35. for id, chunk in zip(recompress_ids, repository.get_many(recompress_ids, read_data=True)):
  36. old_size = len(chunk)
  37. stats["old_size"] += old_size
  38. meta, data = repo_objs.parse(id, chunk, ro_type=ROBJ_DONTCARE)
  39. ro_type = meta.pop("type", None)
  40. compr_old = meta["ctype"], meta["clevel"], meta.get("olevel", -1)
  41. if olevel == -1:
  42. # if the chunk was obfuscated, but should not be in future, remove related metadata
  43. meta.pop("olevel", None)
  44. meta.pop("psize", None)
  45. chunk = repo_objs.format(id, meta, data, ro_type=ro_type)
  46. compr_done = meta["ctype"], meta["clevel"], meta.get("olevel", -1)
  47. if compr_done != compr_old:
  48. # we actually changed something
  49. repository.put(id, chunk, wait=False)
  50. repository.async_response(wait=False)
  51. stats["new_size"] += len(chunk)
  52. compr_keys.add(compr_done)
  53. stats[compr_done] += 1
  54. stats["recompressed_count"] += 1
  55. else:
  56. # It might be that the old chunk used compression none or lz4 (for whatever reason,
  57. # including the old compressor being a DecidingCompressor) AND we used a
  58. # DecidingCompressor now, which did NOT compress like we wanted, but decided
  59. # to use the same compression (and obfuscation) we already had.
  60. # In this case, we just keep the old chunk and do not rewrite it -
  61. # This is important to avoid rewriting such chunks **again and again**.
  62. stats["new_size"] += old_size
  63. compr_keys.add(compr_old)
  64. stats[compr_old] += 1
  65. stats["kept_count"] += 1
  66. def format_compression_spec(ctype, clevel, olevel):
  67. obfuscation = "" if olevel == -1 else f"obfuscate,{olevel},"
  68. for cname, cls in COMPRESSOR_TABLE.items():
  69. if cls.ID == ctype:
  70. cname = f"{cname}"
  71. break
  72. else:
  73. cname = f"{ctype}"
  74. clevel = f",{clevel}" if clevel != 255 else ""
  75. return obfuscation + cname + clevel
  76. class RepoCompressMixIn:
  77. @with_repository(cache=True, manifest=True, compatibility=(Manifest.Operation.CHECK,))
  78. def do_repo_compress(self, args, repository, manifest, cache):
  79. """Repository (re-)compression"""
  80. def get_csettings(c):
  81. if isinstance(c, Auto):
  82. return get_csettings(c.compressor)
  83. if isinstance(c, ObfuscateSize):
  84. ctype, clevel, _ = get_csettings(c.compressor)
  85. olevel = c.level
  86. return ctype, clevel, olevel
  87. ctype, clevel, olevel = c.ID, c.level, -1
  88. return ctype, clevel, olevel
  89. if not isinstance(repository, (Repository, RemoteRepository)):
  90. raise Error("repo-compress not supported for legacy repositories.")
  91. repo_objs = manifest.repo_objs
  92. ctype, clevel, olevel = get_csettings(repo_objs.compressor) # desired compression set by --compression
  93. stats_find = defaultdict(int)
  94. stats_process = defaultdict(int)
  95. recompress_candidate_count = find_chunks(repository, repo_objs, cache, stats_find, ctype, clevel, olevel)
  96. pi = ProgressIndicatorPercent(
  97. total=recompress_candidate_count,
  98. msg="Recompressing %3.1f%%",
  99. step=0.1,
  100. msgid="repo_compress.process_chunks",
  101. )
  102. for id, cie in cache.chunks.iteritems():
  103. if sig_int and sig_int.action_done():
  104. break
  105. if cie.flags & ChunkIndex.F_COMPRESS:
  106. process_chunks(repository, repo_objs, stats_process, [id], olevel)
  107. pi.show()
  108. pi.finish()
  109. if sig_int:
  110. # Ctrl-C / SIGINT: do not commit
  111. raise Error("Got Ctrl-C / SIGINT.")
  112. else:
  113. while repository.async_response(wait=True) is not None:
  114. pass
  115. if args.stats:
  116. print()
  117. print("Recompression stats:")
  118. print(f"Size: previously {stats_process['old_size']} -> now {stats_process['new_size']} bytes.")
  119. print(
  120. f"Change: "
  121. f"{stats_process['new_size'] - stats_process['old_size']} bytes == "
  122. f"{100.0 * stats_process['new_size'] / stats_process['old_size']:3.2f}%"
  123. )
  124. print("Found chunks stats (before processing):")
  125. for ck in stats_find["compr_keys"]:
  126. pretty_ck = format_compression_spec(*ck)
  127. print(f"{pretty_ck}: {stats_find[ck]}")
  128. print(f"Total: {stats_find['checked_count']}")
  129. print(f"Candidates for recompression: {recompress_candidate_count}")
  130. print("Processed chunks stats (after processing):")
  131. for ck in stats_process["compr_keys"]:
  132. pretty_ck = format_compression_spec(*ck)
  133. print(f"{pretty_ck}: {stats_process[ck]}")
  134. print(f"Recompressed and rewritten: {stats_process['recompressed_count']}")
  135. print(f"Kept as is: {stats_process['kept_count']}")
  136. print(f"Total: {stats_process['recompressed_count'] + stats_process['kept_count']}")
  137. def build_parser_repo_compress(self, subparsers, common_parser, mid_common_parser):
  138. from ._common import process_epilog
  139. repo_compress_epilog = process_epilog(
  140. """
  141. Repository (re-)compression (and/or re-obfuscation).
  142. Reads all chunks in the repository and recompresses them if they are not already
  143. using the compression type/level and obfuscation level given via ``--compression``.
  144. If the outcome of the chunk processing indicates a change in compression
  145. type/level or obfuscation level, the processed chunk is written to the repository.
  146. Please note that the outcome might not always be the desired compression
  147. type/level - if no compression gives a shorter output, that might be chosen.
  148. Please note that this command can not work in low (or zero) free disk space
  149. conditions.
  150. If the ``borg repo-compress`` process receives a SIGINT signal (Ctrl-C), the repo
  151. will be committed and compacted and borg will terminate cleanly afterwards.
  152. Both ``--progress`` and ``--stats`` are recommended when ``borg repo-compress``
  153. is used interactively.
  154. You do **not** need to run ``borg compact`` after ``borg repo-compress``.
  155. """
  156. )
  157. subparser = subparsers.add_parser(
  158. "repo-compress",
  159. parents=[common_parser],
  160. add_help=False,
  161. description=self.do_repo_compress.__doc__,
  162. epilog=repo_compress_epilog,
  163. formatter_class=argparse.RawDescriptionHelpFormatter,
  164. help=self.do_repo_compress.__doc__,
  165. )
  166. subparser.set_defaults(func=self.do_repo_compress)
  167. subparser.add_argument(
  168. "-C",
  169. "--compression",
  170. metavar="COMPRESSION",
  171. dest="compression",
  172. type=CompressionSpec,
  173. default=CompressionSpec("lz4"),
  174. action=Highlander,
  175. help="select compression algorithm, see the output of the " '"borg help compression" command for details.',
  176. )
  177. subparser.add_argument("-s", "--stats", dest="stats", action="store_true", help="print statistics")