archive.py 104 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433
  1. import base64
  2. import json
  3. import os
  4. import stat
  5. import sys
  6. import time
  7. from collections import OrderedDict, defaultdict
  8. from contextlib import contextmanager
  9. from datetime import datetime, timedelta
  10. from functools import partial
  11. from getpass import getuser
  12. from io import BytesIO
  13. from itertools import groupby, zip_longest
  14. from shutil import get_terminal_size
  15. from .platformflags import is_win32
  16. from .logger import create_logger
  17. logger = create_logger()
  18. from . import xattr
  19. from .chunker import get_chunker, Chunk
  20. from .cache import ChunkListEntry
  21. from .crypto.key import key_factory, UnsupportedPayloadError, AEADKeyBase
  22. from .compress import Compressor, CompressionSpec
  23. from .constants import * # NOQA
  24. from .crypto.low_level import IntegrityError as IntegrityErrorBase
  25. from .hashindex import ChunkIndex, ChunkIndexEntry, CacheSynchronizer
  26. from .helpers import HardLinkManager
  27. from .helpers import ChunkIteratorFileWrapper, open_item
  28. from .helpers import Error, IntegrityError, set_ec
  29. from .platform import uid2user, user2uid, gid2group, group2gid
  30. from .helpers import parse_timestamp, archive_ts_now
  31. from .helpers import OutputTimestamp, format_timedelta, format_file_size, file_status, FileSize
  32. from .helpers import safe_encode, make_path_safe, remove_surrogates
  33. from .helpers import StableDict
  34. from .helpers import bin_to_hex
  35. from .helpers import safe_ns
  36. from .helpers import ellipsis_truncate, ProgressIndicatorPercent, log_multi
  37. from .helpers import os_open, flags_normal, flags_dir
  38. from .helpers import os_stat
  39. from .helpers import msgpack
  40. from .helpers import sig_int
  41. from .helpers.lrucache import LRUCache
  42. from .manifest import Manifest
  43. from .patterns import PathPrefixPattern, FnmatchPattern, IECommand
  44. from .item import Item, ArchiveItem, ItemDiff
  45. from .platform import acl_get, acl_set, set_flags, get_flags, swidth, hostname
  46. from .remote import cache_if_remote
  47. from .repository import Repository, LIST_SCAN_LIMIT
  48. from .repoobj import RepoObj
  49. has_link = hasattr(os, "link")
  50. class Statistics:
  51. def __init__(self, output_json=False, iec=False):
  52. self.output_json = output_json
  53. self.iec = iec
  54. self.osize = self.usize = self.nfiles = 0
  55. self.osize_parts = self.usize_parts = self.nfiles_parts = 0
  56. self.last_progress = 0 # timestamp when last progress was shown
  57. self.files_stats = defaultdict(int)
  58. self.chunking_time = 0.0
  59. self.hashing_time = 0.0
  60. self.rx_bytes = 0
  61. self.tx_bytes = 0
  62. def update(self, size, unique, part=False):
  63. if not part:
  64. self.osize += size
  65. if unique:
  66. self.usize += size
  67. else:
  68. self.osize_parts += size
  69. if unique:
  70. self.usize_parts += size
  71. def __add__(self, other):
  72. if not isinstance(other, Statistics):
  73. raise TypeError("can only add Statistics objects")
  74. stats = Statistics(self.output_json, self.iec)
  75. stats.osize = self.osize + other.osize
  76. stats.usize = self.usize + other.usize
  77. stats.nfiles = self.nfiles + other.nfiles
  78. stats.osize_parts = self.osize_parts + other.osize_parts
  79. stats.usize_parts = self.usize_parts + other.usize_parts
  80. stats.nfiles_parts = self.nfiles_parts + other.nfiles_parts
  81. stats.chunking_time = self.chunking_time + other.chunking_time
  82. stats.hashing_time = self.hashing_time + other.hashing_time
  83. for key in other.files_stats:
  84. stats.files_stats[key] = self.files_stats[key] + other.files_stats[key]
  85. return stats
  86. def __str__(self):
  87. hashing_time = format_timedelta(timedelta(seconds=self.hashing_time))
  88. chunking_time = format_timedelta(timedelta(seconds=self.chunking_time))
  89. return """\
  90. Number of files: {stats.nfiles}
  91. Original size: {stats.osize_fmt}
  92. Deduplicated size: {stats.usize_fmt}
  93. Time spent in hashing: {hashing_time}
  94. Time spent in chunking: {chunking_time}
  95. Added files: {added_files}
  96. Unchanged files: {unchanged_files}
  97. Modified files: {modified_files}
  98. Error files: {error_files}
  99. Bytes read from remote: {stats.rx_bytes}
  100. Bytes sent to remote: {stats.tx_bytes}
  101. """.format(
  102. stats=self,
  103. hashing_time=hashing_time,
  104. chunking_time=chunking_time,
  105. added_files=self.files_stats["A"],
  106. unchanged_files=self.files_stats["U"],
  107. modified_files=self.files_stats["M"],
  108. error_files=self.files_stats["E"],
  109. )
  110. def __repr__(self):
  111. return "<{cls} object at {hash:#x} ({self.osize}, {self.usize})>".format(
  112. cls=type(self).__name__, hash=id(self), self=self
  113. )
  114. def as_dict(self):
  115. return {
  116. "original_size": FileSize(self.osize, iec=self.iec),
  117. "deduplicated_size": FileSize(self.usize, iec=self.iec),
  118. "nfiles": self.nfiles,
  119. "hashing_time": self.hashing_time,
  120. "chunking_time": self.chunking_time,
  121. "files_stats": self.files_stats,
  122. }
  123. def as_raw_dict(self):
  124. return {
  125. "size": self.osize,
  126. "nfiles": self.nfiles,
  127. "size_parts": self.osize_parts,
  128. "nfiles_parts": self.nfiles_parts,
  129. }
  130. @classmethod
  131. def from_raw_dict(cls, **kw):
  132. self = cls()
  133. self.osize = kw["size"]
  134. self.nfiles = kw["nfiles"]
  135. self.osize_parts = kw["size_parts"]
  136. self.nfiles_parts = kw["nfiles_parts"]
  137. return self
  138. @property
  139. def osize_fmt(self):
  140. return format_file_size(self.osize, iec=self.iec)
  141. @property
  142. def usize_fmt(self):
  143. return format_file_size(self.usize, iec=self.iec)
  144. def show_progress(self, item=None, final=False, stream=None, dt=None):
  145. now = time.monotonic()
  146. if dt is None or now - self.last_progress > dt:
  147. self.last_progress = now
  148. if self.output_json:
  149. if not final:
  150. data = self.as_dict()
  151. data["path"] = remove_surrogates(item.path if item else "")
  152. else:
  153. data = {}
  154. data.update({"time": time.time(), "type": "archive_progress", "finished": final})
  155. msg = json.dumps(data)
  156. end = "\n"
  157. else:
  158. columns, lines = get_terminal_size()
  159. if not final:
  160. msg = "{0.osize_fmt} O {0.usize_fmt} U {0.nfiles} N ".format(self)
  161. path = remove_surrogates(item.path) if item else ""
  162. space = columns - swidth(msg)
  163. if space < 12:
  164. msg = ""
  165. space = columns - swidth(msg)
  166. if space >= 8:
  167. msg += ellipsis_truncate(path, space)
  168. else:
  169. msg = " " * columns
  170. end = "\r"
  171. print(msg, end=end, file=stream or sys.stderr, flush=True)
  172. def is_special(mode):
  173. # file types that get special treatment in --read-special mode
  174. return stat.S_ISBLK(mode) or stat.S_ISCHR(mode) or stat.S_ISFIFO(mode)
  175. class BackupError(Exception):
  176. """
  177. Exception raised for non-OSError-based exceptions while accessing backup files.
  178. """
  179. class BackupOSError(Exception):
  180. """
  181. Wrapper for OSError raised while accessing backup files.
  182. Borg does different kinds of IO, and IO failures have different consequences.
  183. This wrapper represents failures of input file or extraction IO.
  184. These are non-critical and are only reported (exit code = 1, warning).
  185. Any unwrapped IO error is critical and aborts execution (for example repository IO failure).
  186. """
  187. def __init__(self, op, os_error):
  188. self.op = op
  189. self.os_error = os_error
  190. self.errno = os_error.errno
  191. self.strerror = os_error.strerror
  192. self.filename = os_error.filename
  193. def __str__(self):
  194. if self.op:
  195. return f"{self.op}: {self.os_error}"
  196. else:
  197. return str(self.os_error)
  198. class BackupIO:
  199. op = ""
  200. def __call__(self, op=""):
  201. self.op = op
  202. return self
  203. def __enter__(self):
  204. pass
  205. def __exit__(self, exc_type, exc_val, exc_tb):
  206. if exc_type and issubclass(exc_type, OSError):
  207. raise BackupOSError(self.op, exc_val) from exc_val
  208. backup_io = BackupIO()
  209. def backup_io_iter(iterator):
  210. backup_io.op = "read"
  211. while True:
  212. with backup_io:
  213. try:
  214. item = next(iterator)
  215. except StopIteration:
  216. return
  217. yield item
  218. def stat_update_check(st_old, st_curr):
  219. """
  220. this checks for some race conditions between the first filename-based stat()
  221. we did before dispatching to the (hopefully correct) file type backup handler
  222. and the (hopefully) fd-based fstat() we did in the handler.
  223. if there is a problematic difference (e.g. file type changed), we rather
  224. skip the file than being tricked into a security problem.
  225. such races should only happen if:
  226. - we are backing up a live filesystem (no snapshot, not inactive)
  227. - if files change due to normal fs activity at an unfortunate time
  228. - if somebody is doing an attack against us
  229. """
  230. # assuming that a file type change implicates a different inode change AND that inode numbers
  231. # are not duplicate in a short timeframe, this check is redundant and solved by the ino check:
  232. if stat.S_IFMT(st_old.st_mode) != stat.S_IFMT(st_curr.st_mode):
  233. # in this case, we dispatched to wrong handler - abort
  234. raise BackupError("file type changed (race condition), skipping file")
  235. if st_old.st_ino != st_curr.st_ino:
  236. # in this case, the hardlinks-related code in create_helper has the wrong inode - abort!
  237. raise BackupError("file inode changed (race condition), skipping file")
  238. # looks ok, we are still dealing with the same thing - return current stat:
  239. return st_curr
  240. @contextmanager
  241. def OsOpen(*, flags, path=None, parent_fd=None, name=None, noatime=False, op="open"):
  242. with backup_io(op):
  243. fd = os_open(path=path, parent_fd=parent_fd, name=name, flags=flags, noatime=noatime)
  244. try:
  245. yield fd
  246. finally:
  247. # On windows fd is None for directories.
  248. if fd is not None:
  249. os.close(fd)
  250. class DownloadPipeline:
  251. def __init__(self, repository, repo_objs):
  252. self.repository = repository
  253. self.repo_objs = repo_objs
  254. def unpack_many(self, ids, *, filter=None, preload=False):
  255. """
  256. Return iterator of items.
  257. *ids* is a chunk ID list of an item stream. *filter* is a callable
  258. to decide whether an item will be yielded. *preload* preloads the data chunks of every yielded item.
  259. Warning: if *preload* is True then all data chunks of every yielded item have to be retrieved,
  260. otherwise preloaded chunks will accumulate in RemoteRepository and create a memory leak.
  261. """
  262. hlids_preloaded = set()
  263. unpacker = msgpack.Unpacker(use_list=False)
  264. for data in self.fetch_many(ids):
  265. unpacker.feed(data)
  266. items = [Item(internal_dict=item) for item in unpacker]
  267. for item in items:
  268. if "chunks" in item:
  269. item.chunks = [ChunkListEntry(*e) for e in item.chunks]
  270. if filter:
  271. items = [item for item in items if filter(item)]
  272. if preload:
  273. for item in items:
  274. if "chunks" in item:
  275. hlid = item.get("hlid", None)
  276. if hlid is None:
  277. preload_chunks = True
  278. else:
  279. if hlid in hlids_preloaded:
  280. preload_chunks = False
  281. else:
  282. # not having the hardlink's chunks already preloaded for other hardlink to same inode
  283. preload_chunks = True
  284. hlids_preloaded.add(hlid)
  285. if preload_chunks:
  286. self.repository.preload([c.id for c in item.chunks])
  287. for item in items:
  288. yield item
  289. def fetch_many(self, ids, is_preloaded=False):
  290. for id_, cdata in zip(ids, self.repository.get_many(ids, is_preloaded=is_preloaded)):
  291. _, data = self.repo_objs.parse(id_, cdata)
  292. yield data
  293. class ChunkBuffer:
  294. BUFFER_SIZE = 8 * 1024 * 1024
  295. def __init__(self, key, chunker_params=ITEMS_CHUNKER_PARAMS):
  296. self.buffer = BytesIO()
  297. self.packer = msgpack.Packer()
  298. self.chunks = []
  299. self.key = key
  300. self.chunker = get_chunker(*chunker_params, seed=self.key.chunk_seed, sparse=False)
  301. def add(self, item):
  302. self.buffer.write(self.packer.pack(item.as_dict()))
  303. if self.is_full():
  304. self.flush()
  305. def write_chunk(self, chunk):
  306. raise NotImplementedError
  307. def flush(self, flush=False):
  308. if self.buffer.tell() == 0:
  309. return
  310. self.buffer.seek(0)
  311. # The chunker returns a memoryview to its internal buffer,
  312. # thus a copy is needed before resuming the chunker iterator.
  313. # the metadata stream may produce all-zero chunks, so deal
  314. # with CH_ALLOC (and CH_HOLE, for completeness) here.
  315. chunks = []
  316. for chunk in self.chunker.chunkify(self.buffer):
  317. alloc = chunk.meta["allocation"]
  318. if alloc == CH_DATA:
  319. data = bytes(chunk.data)
  320. elif alloc in (CH_ALLOC, CH_HOLE):
  321. data = zeros[: chunk.meta["size"]]
  322. else:
  323. raise ValueError("chunk allocation has unsupported value of %r" % alloc)
  324. chunks.append(data)
  325. self.buffer.seek(0)
  326. self.buffer.truncate(0)
  327. # Leave the last partial chunk in the buffer unless flush is True
  328. end = None if flush or len(chunks) == 1 else -1
  329. for chunk in chunks[:end]:
  330. self.chunks.append(self.write_chunk(chunk))
  331. if end == -1:
  332. self.buffer.write(chunks[-1])
  333. def is_full(self):
  334. return self.buffer.tell() > self.BUFFER_SIZE
  335. class CacheChunkBuffer(ChunkBuffer):
  336. def __init__(self, cache, key, stats, chunker_params=ITEMS_CHUNKER_PARAMS):
  337. super().__init__(key, chunker_params)
  338. self.cache = cache
  339. self.stats = stats
  340. def write_chunk(self, chunk):
  341. id_, _ = self.cache.add_chunk(self.key.id_hash(chunk), {}, chunk, stats=self.stats, wait=False)
  342. self.cache.repository.async_response(wait=False)
  343. return id_
  344. def get_item_uid_gid(item, *, numeric, uid_forced=None, gid_forced=None, uid_default=0, gid_default=0):
  345. if uid_forced is not None:
  346. uid = uid_forced
  347. else:
  348. uid = None if numeric else user2uid(item.get("user"))
  349. uid = item.uid if uid is None else uid
  350. if uid < 0:
  351. uid = uid_default
  352. if gid_forced is not None:
  353. gid = gid_forced
  354. else:
  355. gid = None if numeric else group2gid(item.get("group"))
  356. gid = item.gid if gid is None else gid
  357. if gid < 0:
  358. gid = gid_default
  359. return uid, gid
  360. def archive_get_items(metadata, *, repo_objs, repository):
  361. if "item_ptrs" in metadata: # looks like a v2+ archive
  362. assert "items" not in metadata
  363. items = []
  364. for id, cdata in zip(metadata.item_ptrs, repository.get_many(metadata.item_ptrs)):
  365. _, data = repo_objs.parse(id, cdata)
  366. ids = msgpack.unpackb(data)
  367. items.extend(ids)
  368. return items
  369. if "items" in metadata: # legacy, v1 archive
  370. assert "item_ptrs" not in metadata
  371. return metadata.items
  372. def archive_put_items(chunk_ids, *, repo_objs, cache=None, stats=None, add_reference=None):
  373. """gets a (potentially large) list of archive metadata stream chunk ids and writes them to repo objects"""
  374. item_ptrs = []
  375. for i in range(0, len(chunk_ids), IDS_PER_CHUNK):
  376. data = msgpack.packb(chunk_ids[i : i + IDS_PER_CHUNK])
  377. id = repo_objs.id_hash(data)
  378. if cache is not None and stats is not None:
  379. cache.add_chunk(id, {}, data, stats=stats)
  380. elif add_reference is not None:
  381. cdata = repo_objs.format(id, {}, data)
  382. add_reference(id, len(data), cdata)
  383. else:
  384. raise NotImplementedError
  385. item_ptrs.append(id)
  386. return item_ptrs
  387. class Archive:
  388. class DoesNotExist(Error):
  389. """Archive {} does not exist"""
  390. class AlreadyExists(Error):
  391. """Archive {} already exists"""
  392. class IncompatibleFilesystemEncodingError(Error):
  393. """Failed to encode filename "{}" into file system encoding "{}". Consider configuring the LANG environment variable."""
  394. def __init__(
  395. self,
  396. manifest,
  397. name,
  398. cache=None,
  399. create=False,
  400. checkpoint_interval=1800,
  401. numeric_ids=False,
  402. noatime=False,
  403. noctime=False,
  404. noflags=False,
  405. noacls=False,
  406. noxattrs=False,
  407. progress=False,
  408. chunker_params=CHUNKER_PARAMS,
  409. start=None,
  410. start_monotonic=None,
  411. end=None,
  412. consider_part_files=False,
  413. log_json=False,
  414. iec=False,
  415. ):
  416. self.cwd = os.getcwd()
  417. assert isinstance(manifest, Manifest)
  418. self.manifest = manifest
  419. self.key = manifest.repo_objs.key
  420. self.repo_objs = manifest.repo_objs
  421. self.repository = manifest.repository
  422. self.cache = cache
  423. self.stats = Statistics(output_json=log_json, iec=iec)
  424. self.iec = iec
  425. self.show_progress = progress
  426. self.name = name # overwritten later with name from archive metadata
  427. self.name_in_manifest = name # can differ from .name later (if borg check fixed duplicate archive names)
  428. self.comment = None
  429. self.checkpoint_interval = checkpoint_interval
  430. self.numeric_ids = numeric_ids
  431. self.noatime = noatime
  432. self.noctime = noctime
  433. self.noflags = noflags
  434. self.noacls = noacls
  435. self.noxattrs = noxattrs
  436. assert (start is None) == (
  437. start_monotonic is None
  438. ), "Logic error: if start is given, start_monotonic must be given as well and vice versa."
  439. if start is None:
  440. start = archive_ts_now()
  441. start_monotonic = time.monotonic()
  442. self.chunker_params = chunker_params
  443. self.start = start
  444. self.start_monotonic = start_monotonic
  445. if end is None:
  446. end = archive_ts_now()
  447. self.end = end
  448. self.consider_part_files = consider_part_files
  449. self.pipeline = DownloadPipeline(self.repository, self.repo_objs)
  450. self.create = create
  451. if self.create:
  452. self.items_buffer = CacheChunkBuffer(self.cache, self.key, self.stats)
  453. if name in manifest.archives:
  454. raise self.AlreadyExists(name)
  455. i = 0
  456. while True:
  457. self.checkpoint_name = "{}.checkpoint{}".format(name, i and (".%d" % i) or "")
  458. if self.checkpoint_name not in manifest.archives:
  459. break
  460. i += 1
  461. else:
  462. info = self.manifest.archives.get(name)
  463. if info is None:
  464. raise self.DoesNotExist(name)
  465. self.load(info.id)
  466. def _load_meta(self, id):
  467. cdata = self.repository.get(id)
  468. _, data = self.repo_objs.parse(id, cdata)
  469. metadata = ArchiveItem(internal_dict=msgpack.unpackb(data))
  470. if metadata.version not in (1, 2): # legacy: still need to read v1 archives
  471. raise Exception("Unknown archive metadata version")
  472. # note: metadata.items must not get written to disk!
  473. metadata.items = archive_get_items(metadata, repo_objs=self.repo_objs, repository=self.repository)
  474. return metadata
  475. def load(self, id):
  476. self.id = id
  477. self.metadata = self._load_meta(self.id)
  478. self.name = self.metadata.name
  479. self.comment = self.metadata.get("comment", "")
  480. @property
  481. def ts(self):
  482. """Timestamp of archive creation (start) in UTC"""
  483. ts = self.metadata.time
  484. return parse_timestamp(ts)
  485. @property
  486. def ts_end(self):
  487. """Timestamp of archive creation (end) in UTC"""
  488. # fall back to time if there is no time_end present in metadata
  489. ts = self.metadata.get("time_end") or self.metadata.time
  490. return parse_timestamp(ts)
  491. @property
  492. def fpr(self):
  493. return bin_to_hex(self.id)
  494. @property
  495. def duration(self):
  496. return format_timedelta(self.end - self.start)
  497. @property
  498. def duration_from_meta(self):
  499. return format_timedelta(self.ts_end - self.ts)
  500. def info(self):
  501. if self.create:
  502. stats = self.stats
  503. start = self.start
  504. end = self.end
  505. else:
  506. stats = self.calc_stats(self.cache)
  507. start = self.ts
  508. end = self.ts_end
  509. info = {
  510. "name": self.name,
  511. "id": self.fpr,
  512. "start": OutputTimestamp(start),
  513. "end": OutputTimestamp(end),
  514. "duration": (end - start).total_seconds(),
  515. "stats": stats.as_dict(),
  516. }
  517. if self.create:
  518. info["command_line"] = sys.argv
  519. else:
  520. info.update(
  521. {
  522. "command_line": self.metadata.cmdline,
  523. "hostname": self.metadata.hostname,
  524. "username": self.metadata.username,
  525. "comment": self.metadata.get("comment", ""),
  526. "chunker_params": self.metadata.get("chunker_params", ""),
  527. }
  528. )
  529. return info
  530. def __str__(self):
  531. return """\
  532. Repository: {location}
  533. Archive name: {0.name}
  534. Archive fingerprint: {0.fpr}
  535. Time (start): {start}
  536. Time (end): {end}
  537. Duration: {0.duration}
  538. """.format(
  539. self,
  540. start=OutputTimestamp(self.start),
  541. end=OutputTimestamp(self.end),
  542. location=self.repository._location.canonical_path(),
  543. )
  544. def __repr__(self):
  545. return "Archive(%r)" % self.name
  546. def item_filter(self, item, filter=None):
  547. if not self.consider_part_files and "part" in item:
  548. # this is a part(ial) file, we usually don't want to consider it.
  549. return False
  550. return filter(item) if filter else True
  551. def iter_items(self, filter=None, preload=False):
  552. # note: when calling this with preload=True, later fetch_many() must be called with
  553. # is_preloaded=True or the RemoteRepository code will leak memory!
  554. for item in self.pipeline.unpack_many(
  555. self.metadata.items, preload=preload, filter=lambda item: self.item_filter(item, filter)
  556. ):
  557. yield item
  558. def add_item(self, item, show_progress=True, stats=None):
  559. if show_progress and self.show_progress:
  560. if stats is None:
  561. stats = self.stats
  562. stats.show_progress(item=item, dt=0.2)
  563. self.items_buffer.add(item)
  564. def write_checkpoint(self):
  565. self.save(self.checkpoint_name)
  566. del self.manifest.archives[self.checkpoint_name]
  567. self.cache.chunk_decref(self.id, self.stats)
  568. def save(self, name=None, comment=None, timestamp=None, stats=None, additional_metadata=None):
  569. name = name or self.name
  570. if name in self.manifest.archives:
  571. raise self.AlreadyExists(name)
  572. self.items_buffer.flush(flush=True)
  573. item_ptrs = archive_put_items(
  574. self.items_buffer.chunks, repo_objs=self.repo_objs, cache=self.cache, stats=self.stats
  575. )
  576. duration = timedelta(seconds=time.monotonic() - self.start_monotonic)
  577. if timestamp is None:
  578. end = archive_ts_now()
  579. start = end - duration
  580. else:
  581. start = timestamp
  582. end = start + duration
  583. self.start = start
  584. self.end = end
  585. metadata = {
  586. "version": 2,
  587. "name": name,
  588. "comment": comment or "",
  589. "item_ptrs": item_ptrs, # see #1473
  590. "cmdline": sys.argv,
  591. "hostname": hostname,
  592. "username": getuser(),
  593. "time": start.isoformat(timespec="microseconds"),
  594. "time_end": end.isoformat(timespec="microseconds"),
  595. "chunker_params": self.chunker_params,
  596. }
  597. # we always want to create archives with the addtl. metadata (nfiles, etc.),
  598. # because borg info relies on them. so, either use the given stats (from args)
  599. # or fall back to self.stats if it was not given.
  600. stats = stats or self.stats
  601. metadata.update(
  602. {
  603. "size": stats.osize,
  604. "nfiles": stats.nfiles,
  605. "size_parts": stats.osize_parts,
  606. "nfiles_parts": stats.nfiles_parts,
  607. }
  608. )
  609. metadata.update(additional_metadata or {})
  610. metadata = ArchiveItem(metadata)
  611. data = self.key.pack_and_authenticate_metadata(metadata.as_dict(), context=b"archive")
  612. self.id = self.repo_objs.id_hash(data)
  613. try:
  614. self.cache.add_chunk(self.id, {}, data, stats=self.stats)
  615. except IntegrityError as err:
  616. err_msg = str(err)
  617. # hack to avoid changing the RPC protocol by introducing new (more specific) exception class
  618. if "More than allowed put data" in err_msg:
  619. raise Error("%s - archive too big (issue #1473)!" % err_msg)
  620. else:
  621. raise
  622. while self.repository.async_response(wait=True) is not None:
  623. pass
  624. self.manifest.archives[name] = (self.id, metadata.time)
  625. self.manifest.write()
  626. self.repository.commit(compact=False)
  627. self.cache.commit()
  628. def calc_stats(self, cache, want_unique=True):
  629. if not want_unique:
  630. unique_size = 0
  631. else:
  632. def add(id):
  633. entry = cache.chunks[id]
  634. archive_index.add(id, 1, entry.size)
  635. archive_index = ChunkIndex()
  636. sync = CacheSynchronizer(archive_index)
  637. add(self.id)
  638. # we must escape any % char in the archive name, because we use it in a format string, see #6500
  639. arch_name_escd = self.name.replace("%", "%%")
  640. pi = ProgressIndicatorPercent(
  641. total=len(self.metadata.items),
  642. msg="Calculating statistics for archive %s ... %%3.0f%%%%" % arch_name_escd,
  643. msgid="archive.calc_stats",
  644. )
  645. for id, chunk in zip(self.metadata.items, self.repository.get_many(self.metadata.items)):
  646. pi.show(increase=1)
  647. add(id)
  648. _, data = self.repo_objs.parse(id, chunk)
  649. sync.feed(data)
  650. unique_size = archive_index.stats_against(cache.chunks)[1]
  651. pi.finish()
  652. stats = Statistics(iec=self.iec)
  653. stats.usize = unique_size # the part files use same chunks as the full file
  654. stats.nfiles = self.metadata.nfiles
  655. stats.osize = self.metadata.size
  656. if self.consider_part_files:
  657. stats.nfiles += self.metadata.nfiles_parts
  658. stats.osize += self.metadata.size_parts
  659. return stats
  660. @contextmanager
  661. def extract_helper(self, item, path, hlm, *, dry_run=False):
  662. hardlink_set = False
  663. # Hard link?
  664. if "hlid" in item:
  665. link_target = hlm.retrieve(id=item.hlid)
  666. if link_target is not None and has_link:
  667. if not dry_run:
  668. # another hardlink to same inode (same hlid) was extracted previously, just link to it
  669. with backup_io("link"):
  670. os.link(link_target, path, follow_symlinks=False)
  671. hardlink_set = True
  672. yield hardlink_set
  673. if not hardlink_set:
  674. if "hlid" in item and has_link:
  675. # Update entry with extracted item path, so that following hardlinks don't extract twice.
  676. # We have hardlinking support, so we will hardlink not extract.
  677. hlm.remember(id=item.hlid, info=path)
  678. else:
  679. # Broken platform with no hardlinking support.
  680. # In this case, we *want* to extract twice, because there is no other way.
  681. pass
  682. def extract_item(
  683. self,
  684. item,
  685. restore_attrs=True,
  686. dry_run=False,
  687. stdout=False,
  688. sparse=False,
  689. hlm=None,
  690. stripped_components=0,
  691. original_path=None,
  692. pi=None,
  693. ):
  694. """
  695. Extract archive item.
  696. :param item: the item to extract
  697. :param restore_attrs: restore file attributes
  698. :param dry_run: do not write any data
  699. :param stdout: write extracted data to stdout
  700. :param sparse: write sparse files (chunk-granularity, independent of the original being sparse)
  701. :param hlm: maps hlid to link_target for extracting subtrees with hardlinks correctly
  702. :param stripped_components: stripped leading path components to correct hard link extraction
  703. :param original_path: 'path' key as stored in archive
  704. :param pi: ProgressIndicatorPercent (or similar) for file extraction progress (in bytes)
  705. """
  706. has_damaged_chunks = "chunks_healthy" in item
  707. if dry_run or stdout:
  708. with self.extract_helper(item, "", hlm, dry_run=dry_run or stdout) as hardlink_set:
  709. if not hardlink_set:
  710. # it does not really set hardlinks due to dry_run, but we need to behave same
  711. # as non-dry_run concerning fetching preloaded chunks from the pipeline or
  712. # it would get stuck.
  713. if "chunks" in item:
  714. item_chunks_size = 0
  715. for data in self.pipeline.fetch_many([c.id for c in item.chunks], is_preloaded=True):
  716. if pi:
  717. pi.show(increase=len(data), info=[remove_surrogates(item.path)])
  718. if stdout:
  719. sys.stdout.buffer.write(data)
  720. item_chunks_size += len(data)
  721. if stdout:
  722. sys.stdout.buffer.flush()
  723. if "size" in item:
  724. item_size = item.size
  725. if item_size != item_chunks_size:
  726. raise BackupError(
  727. "Size inconsistency detected: size {}, chunks size {}".format(
  728. item_size, item_chunks_size
  729. )
  730. )
  731. if has_damaged_chunks:
  732. raise BackupError("File has damaged (all-zero) chunks. Try running borg check --repair.")
  733. return
  734. original_path = original_path or item.path
  735. dest = self.cwd
  736. if item.path.startswith(("/", "../")):
  737. raise Exception("Path should be relative and local")
  738. path = os.path.join(dest, item.path)
  739. # Attempt to remove existing files, ignore errors on failure
  740. try:
  741. st = os.stat(path, follow_symlinks=False)
  742. if stat.S_ISDIR(st.st_mode):
  743. os.rmdir(path)
  744. else:
  745. os.unlink(path)
  746. except UnicodeEncodeError:
  747. raise self.IncompatibleFilesystemEncodingError(path, sys.getfilesystemencoding()) from None
  748. except OSError:
  749. pass
  750. def make_parent(path):
  751. parent_dir = os.path.dirname(path)
  752. if not os.path.exists(parent_dir):
  753. os.makedirs(parent_dir)
  754. mode = item.mode
  755. if stat.S_ISREG(mode):
  756. with backup_io("makedirs"):
  757. make_parent(path)
  758. with self.extract_helper(item, path, hlm) as hardlink_set:
  759. if hardlink_set:
  760. return
  761. with backup_io("open"):
  762. fd = open(path, "wb")
  763. with fd:
  764. ids = [c.id for c in item.chunks]
  765. for data in self.pipeline.fetch_many(ids, is_preloaded=True):
  766. if pi:
  767. pi.show(increase=len(data), info=[remove_surrogates(item.path)])
  768. with backup_io("write"):
  769. if sparse and zeros.startswith(data):
  770. # all-zero chunk: create a hole in a sparse file
  771. fd.seek(len(data), 1)
  772. else:
  773. fd.write(data)
  774. with backup_io("truncate_and_attrs"):
  775. pos = item_chunks_size = fd.tell()
  776. fd.truncate(pos)
  777. fd.flush()
  778. self.restore_attrs(path, item, fd=fd.fileno())
  779. if "size" in item:
  780. item_size = item.size
  781. if item_size != item_chunks_size:
  782. raise BackupError(
  783. "Size inconsistency detected: size {}, chunks size {}".format(item_size, item_chunks_size)
  784. )
  785. if has_damaged_chunks:
  786. raise BackupError("File has damaged (all-zero) chunks. Try running borg check --repair.")
  787. return
  788. with backup_io:
  789. # No repository access beyond this point.
  790. if stat.S_ISDIR(mode):
  791. make_parent(path)
  792. if not os.path.exists(path):
  793. os.mkdir(path)
  794. if restore_attrs:
  795. self.restore_attrs(path, item)
  796. elif stat.S_ISLNK(mode):
  797. make_parent(path)
  798. with self.extract_helper(item, path, hlm) as hardlink_set:
  799. if hardlink_set:
  800. # unusual, but possible: this is a hardlinked symlink.
  801. return
  802. source = item.source
  803. try:
  804. os.symlink(source, path)
  805. except UnicodeEncodeError:
  806. raise self.IncompatibleFilesystemEncodingError(source, sys.getfilesystemencoding()) from None
  807. self.restore_attrs(path, item, symlink=True)
  808. elif stat.S_ISFIFO(mode):
  809. make_parent(path)
  810. with self.extract_helper(item, path, hlm) as hardlink_set:
  811. if hardlink_set:
  812. return
  813. os.mkfifo(path)
  814. self.restore_attrs(path, item)
  815. elif stat.S_ISCHR(mode) or stat.S_ISBLK(mode):
  816. make_parent(path)
  817. with self.extract_helper(item, path, hlm) as hardlink_set:
  818. if hardlink_set:
  819. return
  820. os.mknod(path, item.mode, item.rdev)
  821. self.restore_attrs(path, item)
  822. else:
  823. raise Exception("Unknown archive item type %r" % item.mode)
  824. def restore_attrs(self, path, item, symlink=False, fd=None):
  825. """
  826. Restore filesystem attributes on *path* (*fd*) from *item*.
  827. Does not access the repository.
  828. """
  829. backup_io.op = "attrs"
  830. uid, gid = get_item_uid_gid(item, numeric=self.numeric_ids)
  831. # This code is a bit of a mess due to os specific differences
  832. if not is_win32:
  833. try:
  834. if fd:
  835. os.fchown(fd, uid, gid)
  836. else:
  837. os.chown(path, uid, gid, follow_symlinks=False)
  838. except OSError:
  839. pass
  840. if fd:
  841. os.fchmod(fd, item.mode)
  842. else:
  843. # To check whether a particular function in the os module accepts False for its
  844. # follow_symlinks parameter, the in operator on supports_follow_symlinks should be
  845. # used. However, os.chmod is special as some platforms without a working lchmod() do
  846. # have fchmodat(), which has a flag that makes it behave like lchmod(). fchmodat()
  847. # is ignored when deciding whether or not os.chmod should be set in
  848. # os.supports_follow_symlinks. Work around this by using try/except.
  849. try:
  850. os.chmod(path, item.mode, follow_symlinks=False)
  851. except NotImplementedError:
  852. if not symlink:
  853. os.chmod(path, item.mode)
  854. if not self.noacls:
  855. acl_set(path, item, self.numeric_ids, fd=fd)
  856. if not self.noxattrs and "xattrs" in item:
  857. # chown removes Linux capabilities, so set the extended attributes at the end, after chown, since they include
  858. # the Linux capabilities in the "security.capability" attribute.
  859. warning = xattr.set_all(fd or path, item.xattrs, follow_symlinks=False)
  860. if warning:
  861. set_ec(EXIT_WARNING)
  862. # set timestamps rather late
  863. mtime = item.mtime
  864. atime = item.atime if "atime" in item else mtime
  865. if "birthtime" in item:
  866. birthtime = item.birthtime
  867. try:
  868. # This should work on FreeBSD, NetBSD, and Darwin and be harmless on other platforms.
  869. # See utimes(2) on either of the BSDs for details.
  870. if fd:
  871. os.utime(fd, None, ns=(atime, birthtime))
  872. else:
  873. os.utime(path, None, ns=(atime, birthtime), follow_symlinks=False)
  874. except OSError:
  875. # some systems don't support calling utime on a symlink
  876. pass
  877. try:
  878. if fd:
  879. os.utime(fd, None, ns=(atime, mtime))
  880. else:
  881. os.utime(path, None, ns=(atime, mtime), follow_symlinks=False)
  882. except OSError:
  883. # some systems don't support calling utime on a symlink
  884. pass
  885. # bsdflags include the immutable flag and need to be set last:
  886. if not self.noflags and "bsdflags" in item:
  887. try:
  888. set_flags(path, item.bsdflags, fd=fd)
  889. except OSError:
  890. pass
  891. def set_meta(self, key, value):
  892. metadata = self._load_meta(self.id)
  893. setattr(metadata, key, value)
  894. if "items" in metadata:
  895. del metadata.items
  896. data = msgpack.packb(metadata.as_dict())
  897. new_id = self.key.id_hash(data)
  898. self.cache.add_chunk(new_id, {}, data, stats=self.stats)
  899. self.manifest.archives[self.name] = (new_id, metadata.time)
  900. self.cache.chunk_decref(self.id, self.stats)
  901. self.id = new_id
  902. def rename(self, name):
  903. if name in self.manifest.archives:
  904. raise self.AlreadyExists(name)
  905. oldname = self.name
  906. self.name = name
  907. self.set_meta("name", name)
  908. del self.manifest.archives[oldname]
  909. def delete(self, stats, progress=False, forced=False):
  910. class ChunksIndexError(Error):
  911. """Chunk ID {} missing from chunks index, corrupted chunks index - aborting transaction."""
  912. exception_ignored = object()
  913. def fetch_async_response(wait=True):
  914. try:
  915. return self.repository.async_response(wait=wait)
  916. except Repository.ObjectNotFound:
  917. nonlocal error
  918. # object not in repo - strange, but we wanted to delete it anyway.
  919. if forced == 0:
  920. raise
  921. error = True
  922. return exception_ignored # must not return None here
  923. def chunk_decref(id, stats, part=False):
  924. try:
  925. self.cache.chunk_decref(id, stats, wait=False, part=part)
  926. except KeyError:
  927. cid = bin_to_hex(id)
  928. raise ChunksIndexError(cid)
  929. else:
  930. fetch_async_response(wait=False)
  931. error = False
  932. try:
  933. unpacker = msgpack.Unpacker(use_list=False)
  934. items_ids = self.metadata.items
  935. pi = ProgressIndicatorPercent(
  936. total=len(items_ids), msg="Decrementing references %3.0f%%", msgid="archive.delete"
  937. )
  938. for (i, (items_id, data)) in enumerate(zip(items_ids, self.repository.get_many(items_ids))):
  939. if progress:
  940. pi.show(i)
  941. _, data = self.repo_objs.parse(items_id, data)
  942. unpacker.feed(data)
  943. chunk_decref(items_id, stats)
  944. try:
  945. for item in unpacker:
  946. item = Item(internal_dict=item)
  947. if "chunks" in item:
  948. part = not self.consider_part_files and "part" in item
  949. for chunk_id, size in item.chunks:
  950. chunk_decref(chunk_id, stats, part=part)
  951. except (TypeError, ValueError):
  952. # if items metadata spans multiple chunks and one chunk got dropped somehow,
  953. # it could be that unpacker yields bad types
  954. if forced == 0:
  955. raise
  956. error = True
  957. if progress:
  958. pi.finish()
  959. except (msgpack.UnpackException, Repository.ObjectNotFound):
  960. # items metadata corrupted
  961. if forced == 0:
  962. raise
  963. error = True
  964. # delete the blocks that store all the references that end up being loaded into metadata.items:
  965. for id in self.metadata.item_ptrs:
  966. chunk_decref(id, stats)
  967. # in forced delete mode, we try hard to delete at least the manifest entry,
  968. # if possible also the archive superblock, even if processing the items raises
  969. # some harmless exception.
  970. chunk_decref(self.id, stats)
  971. del self.manifest.archives[self.name]
  972. while fetch_async_response(wait=True) is not None:
  973. # we did async deletes, process outstanding results (== exceptions),
  974. # so there is nothing pending when we return and our caller wants to commit.
  975. pass
  976. if error:
  977. logger.warning("forced deletion succeeded, but the deleted archive was corrupted.")
  978. logger.warning("borg check --repair is required to free all space.")
  979. @staticmethod
  980. def compare_archives_iter(archive1, archive2, matcher=None, can_compare_chunk_ids=False):
  981. """
  982. Yields tuples with a path and an ItemDiff instance describing changes/indicating equality.
  983. :param matcher: PatternMatcher class to restrict results to only matching paths.
  984. :param can_compare_chunk_ids: Whether --chunker-params are the same for both archives.
  985. """
  986. def compare_items(item1, item2):
  987. return ItemDiff(
  988. item1,
  989. item2,
  990. archive1.pipeline.fetch_many([c.id for c in item1.get("chunks", [])]),
  991. archive2.pipeline.fetch_many([c.id for c in item2.get("chunks", [])]),
  992. can_compare_chunk_ids=can_compare_chunk_ids,
  993. )
  994. orphans_archive1 = OrderedDict()
  995. orphans_archive2 = OrderedDict()
  996. for item1, item2 in zip_longest(
  997. archive1.iter_items(lambda item: matcher.match(item.path)),
  998. archive2.iter_items(lambda item: matcher.match(item.path)),
  999. ):
  1000. if item1 and item2 and item1.path == item2.path:
  1001. yield (item1.path, compare_items(item1, item2))
  1002. continue
  1003. if item1:
  1004. matching_orphan = orphans_archive2.pop(item1.path, None)
  1005. if matching_orphan:
  1006. yield (item1.path, compare_items(item1, matching_orphan))
  1007. else:
  1008. orphans_archive1[item1.path] = item1
  1009. if item2:
  1010. matching_orphan = orphans_archive1.pop(item2.path, None)
  1011. if matching_orphan:
  1012. yield (matching_orphan.path, compare_items(matching_orphan, item2))
  1013. else:
  1014. orphans_archive2[item2.path] = item2
  1015. # At this point orphans_* contain items that had no matching partner in the other archive
  1016. for added in orphans_archive2.values():
  1017. path = added.path
  1018. deleted_item = Item.create_deleted(path)
  1019. yield (path, compare_items(deleted_item, added))
  1020. for deleted in orphans_archive1.values():
  1021. path = deleted.path
  1022. deleted_item = Item.create_deleted(path)
  1023. yield (path, compare_items(deleted, deleted_item))
  1024. class MetadataCollector:
  1025. def __init__(self, *, noatime, noctime, nobirthtime, numeric_ids, noflags, noacls, noxattrs):
  1026. self.noatime = noatime
  1027. self.noctime = noctime
  1028. self.numeric_ids = numeric_ids
  1029. self.noflags = noflags
  1030. self.noacls = noacls
  1031. self.noxattrs = noxattrs
  1032. self.nobirthtime = nobirthtime
  1033. def stat_simple_attrs(self, st):
  1034. attrs = {}
  1035. attrs["mode"] = st.st_mode
  1036. # borg can work with archives only having mtime (very old borg archives do not have
  1037. # atime/ctime). it can be useful to omit atime/ctime, if they change without the
  1038. # file content changing - e.g. to get better metadata deduplication.
  1039. attrs["mtime"] = safe_ns(st.st_mtime_ns)
  1040. if not self.noatime:
  1041. attrs["atime"] = safe_ns(st.st_atime_ns)
  1042. if not self.noctime:
  1043. attrs["ctime"] = safe_ns(st.st_ctime_ns)
  1044. if not self.nobirthtime and hasattr(st, "st_birthtime"):
  1045. # sadly, there's no stat_result.st_birthtime_ns
  1046. attrs["birthtime"] = safe_ns(int(st.st_birthtime * 10**9))
  1047. attrs["uid"] = st.st_uid
  1048. attrs["gid"] = st.st_gid
  1049. if not self.numeric_ids:
  1050. user = uid2user(st.st_uid)
  1051. if user is not None:
  1052. attrs["user"] = user
  1053. group = gid2group(st.st_gid)
  1054. if group is not None:
  1055. attrs["group"] = group
  1056. return attrs
  1057. def stat_ext_attrs(self, st, path, fd=None):
  1058. attrs = {}
  1059. if not self.noflags:
  1060. with backup_io("extended stat (flags)"):
  1061. flags = get_flags(path, st, fd=fd)
  1062. attrs["bsdflags"] = flags
  1063. if not self.noxattrs:
  1064. with backup_io("extended stat (xattrs)"):
  1065. xattrs = xattr.get_all(fd or path, follow_symlinks=False)
  1066. attrs["xattrs"] = StableDict(xattrs)
  1067. if not self.noacls:
  1068. with backup_io("extended stat (ACLs)"):
  1069. acl_get(path, attrs, st, self.numeric_ids, fd=fd)
  1070. return attrs
  1071. def stat_attrs(self, st, path, fd=None):
  1072. attrs = self.stat_simple_attrs(st)
  1073. attrs.update(self.stat_ext_attrs(st, path, fd=fd))
  1074. return attrs
  1075. # remember a few recently used all-zero chunk hashes in this mapping.
  1076. # (hash_func, chunk_length) -> chunk_hash
  1077. # we play safe and have the hash_func in the mapping key, in case we
  1078. # have different hash_funcs within the same borg run.
  1079. zero_chunk_ids = LRUCache(10, dispose=lambda _: None)
  1080. def cached_hash(chunk, id_hash):
  1081. allocation = chunk.meta["allocation"]
  1082. if allocation == CH_DATA:
  1083. data = chunk.data
  1084. chunk_id = id_hash(data)
  1085. elif allocation in (CH_HOLE, CH_ALLOC):
  1086. size = chunk.meta["size"]
  1087. assert size <= len(zeros)
  1088. data = memoryview(zeros)[:size]
  1089. try:
  1090. chunk_id = zero_chunk_ids[(id_hash, size)]
  1091. except KeyError:
  1092. chunk_id = id_hash(data)
  1093. zero_chunk_ids[(id_hash, size)] = chunk_id
  1094. else:
  1095. raise ValueError("unexpected allocation type")
  1096. return chunk_id, data
  1097. class ChunksProcessor:
  1098. # Processes an iterator of chunks for an Item
  1099. def __init__(self, *, key, cache, add_item, write_checkpoint, checkpoint_interval, rechunkify):
  1100. self.key = key
  1101. self.cache = cache
  1102. self.add_item = add_item
  1103. self.write_checkpoint = write_checkpoint
  1104. self.checkpoint_interval = checkpoint_interval
  1105. self.last_checkpoint = time.monotonic()
  1106. self.rechunkify = rechunkify
  1107. def write_part_file(self, item, from_chunk, number):
  1108. item = Item(internal_dict=item.as_dict())
  1109. length = len(item.chunks)
  1110. # the item should only have the *additional* chunks we processed after the last partial item:
  1111. item.chunks = item.chunks[from_chunk:]
  1112. # for borg recreate, we already have a size member in the source item (giving the total file size),
  1113. # but we consider only a part of the file here, thus we must recompute the size from the chunks:
  1114. item.get_size(memorize=True, from_chunks=True)
  1115. item.path += ".borg_part_%d" % number
  1116. item.part = number
  1117. number += 1
  1118. self.add_item(item, show_progress=False)
  1119. self.write_checkpoint()
  1120. return length, number
  1121. def maybe_checkpoint(self, item, from_chunk, part_number, forced=False):
  1122. sig_int_triggered = sig_int and sig_int.action_triggered()
  1123. if (
  1124. forced
  1125. or sig_int_triggered
  1126. or self.checkpoint_interval
  1127. and time.monotonic() - self.last_checkpoint > self.checkpoint_interval
  1128. ):
  1129. if sig_int_triggered:
  1130. logger.info("checkpoint requested: starting checkpoint creation...")
  1131. from_chunk, part_number = self.write_part_file(item, from_chunk, part_number)
  1132. self.last_checkpoint = time.monotonic()
  1133. if sig_int_triggered:
  1134. sig_int.action_completed()
  1135. logger.info("checkpoint requested: finished checkpoint creation!")
  1136. return from_chunk, part_number
  1137. def process_file_chunks(self, item, cache, stats, show_progress, chunk_iter, chunk_processor=None):
  1138. if not chunk_processor:
  1139. def chunk_processor(chunk):
  1140. started_hashing = time.monotonic()
  1141. chunk_id, data = cached_hash(chunk, self.key.id_hash)
  1142. stats.hashing_time += time.monotonic() - started_hashing
  1143. chunk_entry = cache.add_chunk(chunk_id, {}, data, stats=stats, wait=False)
  1144. self.cache.repository.async_response(wait=False)
  1145. return chunk_entry
  1146. item.chunks = []
  1147. # if we rechunkify, we'll get a fundamentally different chunks list, thus we need
  1148. # to get rid of .chunks_healthy, as it might not correspond to .chunks any more.
  1149. if self.rechunkify and "chunks_healthy" in item:
  1150. del item.chunks_healthy
  1151. from_chunk = 0
  1152. part_number = 1
  1153. for chunk in chunk_iter:
  1154. item.chunks.append(chunk_processor(chunk))
  1155. if show_progress:
  1156. stats.show_progress(item=item, dt=0.2)
  1157. from_chunk, part_number = self.maybe_checkpoint(item, from_chunk, part_number, forced=False)
  1158. else:
  1159. if part_number > 1:
  1160. if item.chunks[from_chunk:]:
  1161. # if we already have created a part item inside this file, we want to put the final
  1162. # chunks (if any) into a part item also (so all parts can be concatenated to get
  1163. # the complete file):
  1164. from_chunk, part_number = self.maybe_checkpoint(item, from_chunk, part_number, forced=True)
  1165. # if we created part files, we have referenced all chunks from the part files,
  1166. # but we also will reference the same chunks also from the final, complete file:
  1167. for chunk in item.chunks:
  1168. cache.chunk_incref(chunk.id, stats, size=chunk.size, part=True)
  1169. stats.nfiles_parts += part_number - 1
  1170. class FilesystemObjectProcessors:
  1171. # When ported to threading, then this doesn't need chunker, cache, key any more.
  1172. # write_checkpoint should then be in the item buffer,
  1173. # and process_file becomes a callback passed to __init__.
  1174. def __init__(
  1175. self,
  1176. *,
  1177. metadata_collector,
  1178. cache,
  1179. key,
  1180. add_item,
  1181. process_file_chunks,
  1182. chunker_params,
  1183. show_progress,
  1184. sparse,
  1185. log_json,
  1186. iec,
  1187. file_status_printer=None,
  1188. ):
  1189. self.metadata_collector = metadata_collector
  1190. self.cache = cache
  1191. self.key = key
  1192. self.add_item = add_item
  1193. self.process_file_chunks = process_file_chunks
  1194. self.show_progress = show_progress
  1195. self.print_file_status = file_status_printer or (lambda *args: None)
  1196. self.hlm = HardLinkManager(id_type=tuple, info_type=(list, type(None))) # (dev, ino) -> chunks or None
  1197. self.stats = Statistics(output_json=log_json, iec=iec) # threading: done by cache (including progress)
  1198. self.cwd = os.getcwd()
  1199. self.chunker = get_chunker(*chunker_params, seed=key.chunk_seed, sparse=sparse)
  1200. @contextmanager
  1201. def create_helper(self, path, st, status=None, hardlinkable=True):
  1202. safe_path = make_path_safe(path)
  1203. item = Item(path=safe_path)
  1204. hardlinked = hardlinkable and st.st_nlink > 1
  1205. update_map = False
  1206. if hardlinked:
  1207. status = "h" # hardlink
  1208. nothing = object()
  1209. chunks = self.hlm.retrieve(id=(st.st_ino, st.st_dev), default=nothing)
  1210. if chunks is nothing:
  1211. update_map = True
  1212. elif chunks is not None:
  1213. item.chunks = chunks
  1214. item.hlid = self.hlm.hardlink_id_from_inode(ino=st.st_ino, dev=st.st_dev)
  1215. yield item, status, hardlinked
  1216. self.add_item(item, stats=self.stats)
  1217. if update_map:
  1218. # remember the hlid of this fs object and if the item has chunks,
  1219. # also remember them, so we do not have to re-chunk a hardlink.
  1220. chunks = item.chunks if "chunks" in item else None
  1221. self.hlm.remember(id=(st.st_ino, st.st_dev), info=chunks)
  1222. def process_dir_with_fd(self, *, path, fd, st):
  1223. with self.create_helper(path, st, "d", hardlinkable=False) as (item, status, hardlinked):
  1224. item.update(self.metadata_collector.stat_attrs(st, path, fd=fd))
  1225. return status
  1226. def process_dir(self, *, path, parent_fd, name, st):
  1227. with self.create_helper(path, st, "d", hardlinkable=False) as (item, status, hardlinked):
  1228. with OsOpen(path=path, parent_fd=parent_fd, name=name, flags=flags_dir, noatime=True, op="dir_open") as fd:
  1229. # fd is None for directories on windows, in that case a race condition check is not possible.
  1230. if fd is not None:
  1231. with backup_io("fstat"):
  1232. st = stat_update_check(st, os.fstat(fd))
  1233. item.update(self.metadata_collector.stat_attrs(st, path, fd=fd))
  1234. return status
  1235. def process_fifo(self, *, path, parent_fd, name, st):
  1236. with self.create_helper(path, st, "f") as (item, status, hardlinked): # fifo
  1237. with OsOpen(path=path, parent_fd=parent_fd, name=name, flags=flags_normal, noatime=True) as fd:
  1238. with backup_io("fstat"):
  1239. st = stat_update_check(st, os.fstat(fd))
  1240. item.update(self.metadata_collector.stat_attrs(st, path, fd=fd))
  1241. return status
  1242. def process_dev(self, *, path, parent_fd, name, st, dev_type):
  1243. with self.create_helper(path, st, dev_type) as (item, status, hardlinked): # char/block device
  1244. # looks like we can not work fd-based here without causing issues when trying to open/close the device
  1245. with backup_io("stat"):
  1246. st = stat_update_check(st, os_stat(path=path, parent_fd=parent_fd, name=name, follow_symlinks=False))
  1247. item.rdev = st.st_rdev
  1248. item.update(self.metadata_collector.stat_attrs(st, path))
  1249. return status
  1250. def process_symlink(self, *, path, parent_fd, name, st):
  1251. with self.create_helper(path, st, "s", hardlinkable=True) as (item, status, hardlinked):
  1252. fname = name if name is not None and parent_fd is not None else path
  1253. with backup_io("readlink"):
  1254. source = os.readlink(fname, dir_fd=parent_fd)
  1255. item.source = source
  1256. item.update(self.metadata_collector.stat_attrs(st, path)) # can't use FD here?
  1257. return status
  1258. def process_pipe(self, *, path, cache, fd, mode, user, group):
  1259. status = "i" # stdin (or other pipe)
  1260. self.print_file_status(status, path)
  1261. status = None # we already printed the status
  1262. uid = user2uid(user)
  1263. if uid is None:
  1264. raise Error("no such user: %s" % user)
  1265. gid = group2gid(group)
  1266. if gid is None:
  1267. raise Error("no such group: %s" % group)
  1268. t = int(time.time()) * 1000000000
  1269. item = Item(
  1270. path=path,
  1271. mode=mode & 0o107777 | 0o100000, # forcing regular file mode
  1272. uid=uid,
  1273. user=user,
  1274. gid=gid,
  1275. group=group,
  1276. mtime=t,
  1277. atime=t,
  1278. ctime=t,
  1279. )
  1280. self.process_file_chunks(item, cache, self.stats, self.show_progress, backup_io_iter(self.chunker.chunkify(fd)))
  1281. item.get_size(memorize=True)
  1282. self.stats.nfiles += 1
  1283. self.add_item(item, stats=self.stats)
  1284. return status
  1285. def process_file(self, *, path, parent_fd, name, st, cache, flags=flags_normal):
  1286. with self.create_helper(path, st, None) as (item, status, hardlinked): # no status yet
  1287. with OsOpen(path=path, parent_fd=parent_fd, name=name, flags=flags, noatime=True) as fd:
  1288. with backup_io("fstat"):
  1289. st = stat_update_check(st, os.fstat(fd))
  1290. item.update(self.metadata_collector.stat_simple_attrs(st))
  1291. is_special_file = is_special(st.st_mode)
  1292. if is_special_file:
  1293. # we process a special file like a regular file. reflect that in mode,
  1294. # so it can be extracted / accessed in FUSE mount like a regular file.
  1295. # this needs to be done early, so that part files also get the patched mode.
  1296. item.mode = stat.S_IFREG | stat.S_IMODE(item.mode)
  1297. if "chunks" in item: # create_helper might have put chunks from a previous hardlink there
  1298. [cache.chunk_incref(id_, self.stats) for id_, _ in item.chunks]
  1299. else: # normal case, no "2nd+" hardlink
  1300. if not is_special_file:
  1301. hashed_path = safe_encode(os.path.join(self.cwd, path))
  1302. started_hashing = time.monotonic()
  1303. path_hash = self.key.id_hash(hashed_path)
  1304. self.stats.hashing_time += time.monotonic() - started_hashing
  1305. known, ids = cache.file_known_and_unchanged(hashed_path, path_hash, st)
  1306. else:
  1307. # in --read-special mode, we may be called for special files.
  1308. # there should be no information in the cache about special files processed in
  1309. # read-special mode, but we better play safe as this was wrong in the past:
  1310. hashed_path = path_hash = None
  1311. known, ids = False, None
  1312. chunks = None
  1313. if ids is not None:
  1314. # Make sure all ids are available
  1315. for id_ in ids:
  1316. if not cache.seen_chunk(id_):
  1317. status = (
  1318. "M" # cache said it is unmodified, but we lost a chunk: process file like modified
  1319. )
  1320. break
  1321. else:
  1322. chunks = [cache.chunk_incref(id_, self.stats) for id_ in ids]
  1323. status = "U" # regular file, unchanged
  1324. else:
  1325. status = "M" if known else "A" # regular file, modified or added
  1326. self.print_file_status(status, path)
  1327. self.stats.files_stats[status] += 1
  1328. status = None # we already printed the status
  1329. # Only chunkify the file if needed
  1330. if chunks is not None:
  1331. item.chunks = chunks
  1332. else:
  1333. with backup_io("read"):
  1334. self.process_file_chunks(
  1335. item,
  1336. cache,
  1337. self.stats,
  1338. self.show_progress,
  1339. backup_io_iter(self.chunker.chunkify(None, fd)),
  1340. )
  1341. self.stats.chunking_time = self.chunker.chunking_time
  1342. if is_win32:
  1343. changed_while_backup = False # TODO
  1344. else:
  1345. with backup_io("fstat2"):
  1346. st2 = os.fstat(fd)
  1347. # special files:
  1348. # - fifos change naturally, because they are fed from the other side. no problem.
  1349. # - blk/chr devices don't change ctime anyway.
  1350. changed_while_backup = not is_special_file and st.st_ctime_ns != st2.st_ctime_ns
  1351. if changed_while_backup:
  1352. status = "C" # regular file changed while we backed it up, might be inconsistent/corrupt!
  1353. if not is_special_file and not changed_while_backup:
  1354. # we must not memorize special files, because the contents of e.g. a
  1355. # block or char device will change without its mtime/size/inode changing.
  1356. # also, we must not memorize a potentially inconsistent/corrupt file that
  1357. # changed while we backed it up.
  1358. cache.memorize_file(hashed_path, path_hash, st, [c.id for c in item.chunks])
  1359. self.stats.nfiles += 1
  1360. item.update(self.metadata_collector.stat_ext_attrs(st, path, fd=fd))
  1361. item.get_size(memorize=True)
  1362. return status
  1363. class TarfileObjectProcessors:
  1364. def __init__(
  1365. self,
  1366. *,
  1367. cache,
  1368. key,
  1369. add_item,
  1370. process_file_chunks,
  1371. chunker_params,
  1372. show_progress,
  1373. log_json,
  1374. iec,
  1375. file_status_printer=None,
  1376. ):
  1377. self.cache = cache
  1378. self.key = key
  1379. self.add_item = add_item
  1380. self.process_file_chunks = process_file_chunks
  1381. self.show_progress = show_progress
  1382. self.print_file_status = file_status_printer or (lambda *args: None)
  1383. self.stats = Statistics(output_json=log_json, iec=iec) # threading: done by cache (including progress)
  1384. self.chunker = get_chunker(*chunker_params, seed=key.chunk_seed, sparse=False)
  1385. self.hlm = HardLinkManager(id_type=str, info_type=list) # path -> chunks
  1386. @contextmanager
  1387. def create_helper(self, tarinfo, status=None, type=None):
  1388. ph = tarinfo.pax_headers
  1389. if ph and "BORG.item.version" in ph:
  1390. assert ph["BORG.item.version"] == "1"
  1391. meta_bin = base64.b64decode(ph["BORG.item.meta"])
  1392. meta_dict = msgpack.unpackb(meta_bin, object_hook=StableDict)
  1393. item = Item(internal_dict=meta_dict)
  1394. else:
  1395. def s_to_ns(s):
  1396. return safe_ns(int(float(s) * 1e9))
  1397. item = Item(
  1398. path=make_path_safe(tarinfo.name),
  1399. mode=tarinfo.mode | type,
  1400. uid=tarinfo.uid,
  1401. gid=tarinfo.gid,
  1402. mtime=s_to_ns(tarinfo.mtime),
  1403. )
  1404. if tarinfo.uname:
  1405. item.user = tarinfo.uname
  1406. if tarinfo.gname:
  1407. item.group = tarinfo.gname
  1408. if ph:
  1409. # note: for mtime this is a bit redundant as it is already done by tarfile module,
  1410. # but we just do it in our way to be consistent for sure.
  1411. for name in "atime", "ctime", "mtime":
  1412. if name in ph:
  1413. ns = s_to_ns(ph[name])
  1414. setattr(item, name, ns)
  1415. yield item, status
  1416. # if we get here, "with"-block worked ok without error/exception, the item was processed ok...
  1417. self.add_item(item, stats=self.stats)
  1418. def process_dir(self, *, tarinfo, status, type):
  1419. with self.create_helper(tarinfo, status, type) as (item, status):
  1420. return status
  1421. def process_fifo(self, *, tarinfo, status, type):
  1422. with self.create_helper(tarinfo, status, type) as (item, status):
  1423. return status
  1424. def process_dev(self, *, tarinfo, status, type):
  1425. with self.create_helper(tarinfo, status, type) as (item, status):
  1426. item.rdev = os.makedev(tarinfo.devmajor, tarinfo.devminor)
  1427. return status
  1428. def process_symlink(self, *, tarinfo, status, type):
  1429. with self.create_helper(tarinfo, status, type) as (item, status):
  1430. item.source = tarinfo.linkname
  1431. return status
  1432. def process_hardlink(self, *, tarinfo, status, type):
  1433. with self.create_helper(tarinfo, status, type) as (item, status):
  1434. # create a not hardlinked borg item, reusing the chunks, see HardLinkManager.__doc__
  1435. chunks = self.hlm.retrieve(tarinfo.linkname)
  1436. if chunks is not None:
  1437. item.chunks = chunks
  1438. item.get_size(memorize=True, from_chunks=True)
  1439. self.stats.nfiles += 1
  1440. return status
  1441. def process_file(self, *, tarinfo, status, type, tar):
  1442. with self.create_helper(tarinfo, status, type) as (item, status):
  1443. self.print_file_status(status, tarinfo.name)
  1444. status = None # we already printed the status
  1445. fd = tar.extractfile(tarinfo)
  1446. self.process_file_chunks(
  1447. item, self.cache, self.stats, self.show_progress, backup_io_iter(self.chunker.chunkify(fd))
  1448. )
  1449. item.get_size(memorize=True, from_chunks=True)
  1450. self.stats.nfiles += 1
  1451. # we need to remember ALL files, see HardLinkManager.__doc__
  1452. self.hlm.remember(id=tarinfo.name, info=item.chunks)
  1453. return status
  1454. def valid_msgpacked_dict(d, keys_serialized):
  1455. """check if the data <d> looks like a msgpacked dict"""
  1456. d_len = len(d)
  1457. if d_len == 0:
  1458. return False
  1459. if d[0] & 0xF0 == 0x80: # object is a fixmap (up to 15 elements)
  1460. offs = 1
  1461. elif d[0] == 0xDE: # object is a map16 (up to 2^16-1 elements)
  1462. offs = 3
  1463. else:
  1464. # object is not a map (dict)
  1465. # note: we must not have dicts with > 2^16-1 elements
  1466. return False
  1467. if d_len <= offs:
  1468. return False
  1469. # is the first dict key a bytestring?
  1470. if d[offs] & 0xE0 == 0xA0: # key is a small bytestring (up to 31 chars)
  1471. pass
  1472. elif d[offs] in (0xD9, 0xDA, 0xDB): # key is a str8, str16 or str32
  1473. pass
  1474. else:
  1475. # key is not a bytestring
  1476. return False
  1477. # is the bytestring any of the expected key names?
  1478. key_serialized = d[offs:]
  1479. return any(key_serialized.startswith(pattern) for pattern in keys_serialized)
  1480. class RobustUnpacker:
  1481. """A restartable/robust version of the streaming msgpack unpacker"""
  1482. def __init__(self, validator, item_keys):
  1483. super().__init__()
  1484. self.item_keys = [msgpack.packb(name) for name in item_keys]
  1485. self.validator = validator
  1486. self._buffered_data = []
  1487. self._resync = False
  1488. self._unpacker = msgpack.Unpacker(object_hook=StableDict)
  1489. def resync(self):
  1490. self._buffered_data = []
  1491. self._resync = True
  1492. def feed(self, data):
  1493. if self._resync:
  1494. self._buffered_data.append(data)
  1495. else:
  1496. self._unpacker.feed(data)
  1497. def __iter__(self):
  1498. return self
  1499. def __next__(self):
  1500. if self._resync:
  1501. data = b"".join(self._buffered_data)
  1502. while self._resync:
  1503. if not data:
  1504. raise StopIteration
  1505. # Abort early if the data does not look like a serialized item dict
  1506. if not valid_msgpacked_dict(data, self.item_keys):
  1507. data = data[1:]
  1508. continue
  1509. self._unpacker = msgpack.Unpacker(object_hook=StableDict)
  1510. self._unpacker.feed(data)
  1511. try:
  1512. item = next(self._unpacker)
  1513. except (msgpack.UnpackException, StopIteration):
  1514. # as long as we are resyncing, we also ignore StopIteration
  1515. pass
  1516. else:
  1517. if self.validator(item):
  1518. self._resync = False
  1519. return item
  1520. data = data[1:]
  1521. else:
  1522. return next(self._unpacker)
  1523. class ArchiveChecker:
  1524. def __init__(self):
  1525. self.error_found = False
  1526. self.possibly_superseded = set()
  1527. def check(self, repository, repair=False, first=0, last=0, sort_by="", match=None, verify_data=False):
  1528. """Perform a set of checks on 'repository'
  1529. :param repair: enable repair mode, write updated or corrected data into repository
  1530. :param first/last/sort_by: only check this number of first/last archives ordered by sort_by
  1531. :param match: only check archives matching this pattern
  1532. :param verify_data: integrity verification of data referenced by archives
  1533. """
  1534. logger.info("Starting archive consistency check...")
  1535. self.check_all = not any((first, last, match))
  1536. self.repair = repair
  1537. self.repository = repository
  1538. self.init_chunks()
  1539. if not self.chunks:
  1540. logger.error("Repository contains no apparent data at all, cannot continue check/repair.")
  1541. return False
  1542. self.key = self.make_key(repository)
  1543. self.repo_objs = RepoObj(self.key)
  1544. if verify_data:
  1545. self.verify_data()
  1546. if Manifest.MANIFEST_ID not in self.chunks:
  1547. logger.error("Repository manifest not found!")
  1548. self.error_found = True
  1549. self.manifest = self.rebuild_manifest()
  1550. else:
  1551. try:
  1552. self.manifest = Manifest.load(repository, (Manifest.Operation.CHECK,), key=self.key)
  1553. except IntegrityErrorBase as exc:
  1554. logger.error("Repository manifest is corrupted: %s", exc)
  1555. self.error_found = True
  1556. del self.chunks[Manifest.MANIFEST_ID]
  1557. self.manifest = self.rebuild_manifest()
  1558. self.rebuild_refcounts(match=match, first=first, last=last, sort_by=sort_by)
  1559. self.orphan_chunks_check()
  1560. self.finish()
  1561. if self.error_found:
  1562. logger.error("Archive consistency check complete, problems found.")
  1563. else:
  1564. logger.info("Archive consistency check complete, no problems found.")
  1565. return self.repair or not self.error_found
  1566. def init_chunks(self):
  1567. """Fetch a list of all object keys from repository"""
  1568. # Explicitly set the initial usable hash table capacity to avoid performance issues
  1569. # due to hash table "resonance".
  1570. # Since reconstruction of archive items can add some new chunks, add 10 % headroom.
  1571. self.chunks = ChunkIndex(usable=len(self.repository) * 1.1)
  1572. marker = None
  1573. while True:
  1574. result = self.repository.list(limit=LIST_SCAN_LIMIT, marker=marker)
  1575. if not result:
  1576. break
  1577. marker = result[-1]
  1578. init_entry = ChunkIndexEntry(refcount=0, size=0)
  1579. for id_ in result:
  1580. self.chunks[id_] = init_entry
  1581. def make_key(self, repository):
  1582. attempt = 0
  1583. for chunkid, _ in self.chunks.iteritems():
  1584. attempt += 1
  1585. if attempt > 999:
  1586. # we did a lot of attempts, but could not create the key via key_factory, give up.
  1587. break
  1588. cdata = repository.get(chunkid)
  1589. try:
  1590. return key_factory(repository, cdata)
  1591. except UnsupportedPayloadError:
  1592. # we get here, if the cdata we got has a corrupted key type byte
  1593. pass # ignore it, just try the next chunk
  1594. if attempt == 0:
  1595. msg = "make_key: repository has no chunks at all!"
  1596. else:
  1597. msg = "make_key: failed to create the key (tried %d chunks)" % attempt
  1598. raise IntegrityError(msg)
  1599. def verify_data(self):
  1600. logger.info("Starting cryptographic data integrity verification...")
  1601. chunks_count_index = len(self.chunks)
  1602. chunks_count_segments = 0
  1603. errors = 0
  1604. # for the new crypto, derived from AEADKeyBase, we know that it checks authenticity on
  1605. # the crypto.low_level level - invalid chunks will fail to AEAD authenticate.
  1606. # for these key types, we know that there is no need to decompress the data afterwards.
  1607. # for all other modes, we assume that we must decompress, so we can verify authenticity
  1608. # based on the plaintext MAC (via calling ._assert_id(id, plaintext)).
  1609. decompress = not isinstance(self.key, AEADKeyBase)
  1610. defect_chunks = []
  1611. pi = ProgressIndicatorPercent(
  1612. total=chunks_count_index, msg="Verifying data %6.2f%%", step=0.01, msgid="check.verify_data"
  1613. )
  1614. state = None
  1615. while True:
  1616. chunk_ids, state = self.repository.scan(limit=100, state=state)
  1617. if not chunk_ids:
  1618. break
  1619. chunks_count_segments += len(chunk_ids)
  1620. chunk_data_iter = self.repository.get_many(chunk_ids)
  1621. chunk_ids_revd = list(reversed(chunk_ids))
  1622. while chunk_ids_revd:
  1623. pi.show()
  1624. chunk_id = chunk_ids_revd.pop(-1) # better efficiency
  1625. try:
  1626. encrypted_data = next(chunk_data_iter)
  1627. except (Repository.ObjectNotFound, IntegrityErrorBase) as err:
  1628. self.error_found = True
  1629. errors += 1
  1630. logger.error("chunk %s: %s", bin_to_hex(chunk_id), err)
  1631. if isinstance(err, IntegrityErrorBase):
  1632. defect_chunks.append(chunk_id)
  1633. # as the exception killed our generator, make a new one for remaining chunks:
  1634. if chunk_ids_revd:
  1635. chunk_ids = list(reversed(chunk_ids_revd))
  1636. chunk_data_iter = self.repository.get_many(chunk_ids)
  1637. else:
  1638. try:
  1639. self.repo_objs.parse(chunk_id, encrypted_data, decompress=decompress)
  1640. except IntegrityErrorBase as integrity_error:
  1641. self.error_found = True
  1642. errors += 1
  1643. logger.error("chunk %s, integrity error: %s", bin_to_hex(chunk_id), integrity_error)
  1644. defect_chunks.append(chunk_id)
  1645. pi.finish()
  1646. if chunks_count_index != chunks_count_segments:
  1647. logger.error("Repo/Chunks index object count vs. segment files object count mismatch.")
  1648. logger.error(
  1649. "Repo/Chunks index: %d objects != segment files: %d objects", chunks_count_index, chunks_count_segments
  1650. )
  1651. if defect_chunks:
  1652. if self.repair:
  1653. # if we kill the defect chunk here, subsequent actions within this "borg check"
  1654. # run will find missing chunks and replace them with all-zero replacement
  1655. # chunks and flag the files as "repaired".
  1656. # if another backup is done later and the missing chunks get backed up again,
  1657. # a "borg check" afterwards can heal all files where this chunk was missing.
  1658. logger.warning(
  1659. "Found defect chunks. They will be deleted now, so affected files can "
  1660. "get repaired now and maybe healed later."
  1661. )
  1662. for defect_chunk in defect_chunks:
  1663. # remote repo (ssh): retry might help for strange network / NIC / RAM errors
  1664. # as the chunk will be retransmitted from remote server.
  1665. # local repo (fs): as chunks.iteritems loop usually pumps a lot of data through,
  1666. # a defect chunk is likely not in the fs cache any more and really gets re-read
  1667. # from the underlying media.
  1668. try:
  1669. encrypted_data = self.repository.get(defect_chunk)
  1670. self.repo_objs.parse(defect_chunk, encrypted_data, decompress=decompress)
  1671. except IntegrityErrorBase:
  1672. # failed twice -> get rid of this chunk
  1673. del self.chunks[defect_chunk]
  1674. self.repository.delete(defect_chunk)
  1675. logger.debug("chunk %s deleted.", bin_to_hex(defect_chunk))
  1676. else:
  1677. logger.warning("chunk %s not deleted, did not consistently fail.", bin_to_hex(defect_chunk))
  1678. else:
  1679. logger.warning(
  1680. "Found defect chunks. With --repair, they would get deleted, so affected "
  1681. "files could get repaired then and maybe healed later."
  1682. )
  1683. for defect_chunk in defect_chunks:
  1684. logger.debug("chunk %s is defect.", bin_to_hex(defect_chunk))
  1685. log = logger.error if errors else logger.info
  1686. log(
  1687. "Finished cryptographic data integrity verification, verified %d chunks with %d integrity errors.",
  1688. chunks_count_segments,
  1689. errors,
  1690. )
  1691. def rebuild_manifest(self):
  1692. """Rebuild the manifest object if it is missing
  1693. Iterates through all objects in the repository looking for archive metadata blocks.
  1694. """
  1695. def valid_archive(obj):
  1696. if not isinstance(obj, dict):
  1697. return False
  1698. return REQUIRED_ARCHIVE_KEYS.issubset(obj)
  1699. logger.info("Rebuilding missing manifest, this might take some time...")
  1700. # as we have lost the manifest, we do not know any more what valid item keys we had.
  1701. # collecting any key we encounter in a damaged repo seems unwise, thus we just use
  1702. # the hardcoded list from the source code. thus, it is not recommended to rebuild a
  1703. # lost manifest on a older borg version than the most recent one that was ever used
  1704. # within this repository (assuming that newer borg versions support more item keys).
  1705. manifest = Manifest(self.key, self.repository)
  1706. archive_keys_serialized = [msgpack.packb(name) for name in ARCHIVE_KEYS]
  1707. pi = ProgressIndicatorPercent(
  1708. total=len(self.chunks), msg="Rebuilding manifest %6.2f%%", step=0.01, msgid="check.rebuild_manifest"
  1709. )
  1710. for chunk_id, _ in self.chunks.iteritems():
  1711. pi.show()
  1712. cdata = self.repository.get(chunk_id)
  1713. try:
  1714. _, data = self.repo_objs.parse(chunk_id, cdata)
  1715. except IntegrityErrorBase as exc:
  1716. logger.error("Skipping corrupted chunk: %s", exc)
  1717. self.error_found = True
  1718. continue
  1719. if not valid_msgpacked_dict(data, archive_keys_serialized):
  1720. continue
  1721. if b"cmdline" not in data or b"\xa7version\x02" not in data:
  1722. continue
  1723. try:
  1724. archive = msgpack.unpackb(data)
  1725. # Ignore exceptions that might be raised when feeding msgpack with invalid data
  1726. except msgpack.UnpackException:
  1727. continue
  1728. if valid_archive(archive):
  1729. archive = ArchiveItem(internal_dict=archive)
  1730. name = archive.name
  1731. logger.info("Found archive %s", name)
  1732. if name in manifest.archives:
  1733. i = 1
  1734. while True:
  1735. new_name = "%s.%d" % (name, i)
  1736. if new_name not in manifest.archives:
  1737. break
  1738. i += 1
  1739. logger.warning("Duplicate archive name %s, storing as %s", name, new_name)
  1740. name = new_name
  1741. manifest.archives[name] = (chunk_id, archive.time)
  1742. pi.finish()
  1743. logger.info("Manifest rebuild complete.")
  1744. return manifest
  1745. def rebuild_refcounts(self, first=0, last=0, sort_by="", match=None):
  1746. """Rebuild object reference counts by walking the metadata
  1747. Missing and/or incorrect data is repaired when detected
  1748. """
  1749. # Exclude the manifest from chunks (manifest entry might be already deleted from self.chunks)
  1750. self.chunks.pop(Manifest.MANIFEST_ID, None)
  1751. def mark_as_possibly_superseded(id_):
  1752. if self.chunks.get(id_, ChunkIndexEntry(0, 0)).refcount == 0:
  1753. self.possibly_superseded.add(id_)
  1754. def add_callback(chunk):
  1755. id_ = self.key.id_hash(chunk)
  1756. cdata = self.repo_objs.format(id_, {}, chunk)
  1757. add_reference(id_, len(chunk), cdata)
  1758. return id_
  1759. def add_reference(id_, size, cdata=None):
  1760. try:
  1761. self.chunks.incref(id_)
  1762. except KeyError:
  1763. assert cdata is not None
  1764. self.chunks[id_] = ChunkIndexEntry(refcount=1, size=size)
  1765. if self.repair:
  1766. self.repository.put(id_, cdata)
  1767. def verify_file_chunks(archive_name, item):
  1768. """Verifies that all file chunks are present.
  1769. Missing file chunks will be replaced with new chunks of the same length containing all zeros.
  1770. If a previously missing file chunk re-appears, the replacement chunk is replaced by the correct one.
  1771. """
  1772. def replacement_chunk(size):
  1773. chunk = Chunk(None, allocation=CH_ALLOC, size=size)
  1774. chunk_id, data = cached_hash(chunk, self.key.id_hash)
  1775. cdata = self.repo_objs.format(chunk_id, {}, data)
  1776. return chunk_id, size, cdata
  1777. offset = 0
  1778. chunk_list = []
  1779. chunks_replaced = False
  1780. has_chunks_healthy = "chunks_healthy" in item
  1781. chunks_current = item.chunks
  1782. chunks_healthy = item.chunks_healthy if has_chunks_healthy else chunks_current
  1783. if has_chunks_healthy and len(chunks_current) != len(chunks_healthy):
  1784. # should never happen, but there was issue #3218.
  1785. logger.warning(f"{archive_name}: {item.path}: Invalid chunks_healthy metadata removed!")
  1786. del item.chunks_healthy
  1787. has_chunks_healthy = False
  1788. chunks_healthy = chunks_current
  1789. for chunk_current, chunk_healthy in zip(chunks_current, chunks_healthy):
  1790. chunk_id, size = chunk_healthy
  1791. if chunk_id not in self.chunks:
  1792. # a chunk of the healthy list is missing
  1793. if chunk_current == chunk_healthy:
  1794. logger.error(
  1795. "{}: {}: New missing file chunk detected (Byte {}-{}, Chunk {}). "
  1796. "Replacing with all-zero chunk.".format(
  1797. archive_name, item.path, offset, offset + size, bin_to_hex(chunk_id)
  1798. )
  1799. )
  1800. self.error_found = chunks_replaced = True
  1801. chunk_id, size, cdata = replacement_chunk(size)
  1802. add_reference(chunk_id, size, cdata)
  1803. else:
  1804. logger.info(
  1805. "{}: {}: Previously missing file chunk is still missing (Byte {}-{}, Chunk {}). "
  1806. "It has an all-zero replacement chunk already.".format(
  1807. archive_name, item.path, offset, offset + size, bin_to_hex(chunk_id)
  1808. )
  1809. )
  1810. chunk_id, size = chunk_current
  1811. if chunk_id in self.chunks:
  1812. add_reference(chunk_id, size)
  1813. else:
  1814. logger.warning(
  1815. "{}: {}: Missing all-zero replacement chunk detected (Byte {}-{}, Chunk {}). "
  1816. "Generating new replacement chunk.".format(
  1817. archive_name, item.path, offset, offset + size, bin_to_hex(chunk_id)
  1818. )
  1819. )
  1820. self.error_found = chunks_replaced = True
  1821. chunk_id, size, cdata = replacement_chunk(size)
  1822. add_reference(chunk_id, size, cdata)
  1823. else:
  1824. if chunk_current == chunk_healthy:
  1825. # normal case, all fine.
  1826. add_reference(chunk_id, size)
  1827. else:
  1828. logger.info(
  1829. "{}: {}: Healed previously missing file chunk! (Byte {}-{}, Chunk {}).".format(
  1830. archive_name, item.path, offset, offset + size, bin_to_hex(chunk_id)
  1831. )
  1832. )
  1833. add_reference(chunk_id, size)
  1834. mark_as_possibly_superseded(chunk_current[0]) # maybe orphaned the all-zero replacement chunk
  1835. chunk_list.append([chunk_id, size]) # list-typed element as chunks_healthy is list-of-lists
  1836. offset += size
  1837. if chunks_replaced and not has_chunks_healthy:
  1838. # if this is first repair, remember the correct chunk IDs, so we can maybe heal the file later
  1839. item.chunks_healthy = item.chunks
  1840. if has_chunks_healthy and chunk_list == chunks_healthy:
  1841. logger.info(f"{archive_name}: {item.path}: Completely healed previously damaged file!")
  1842. del item.chunks_healthy
  1843. item.chunks = chunk_list
  1844. if "size" in item:
  1845. item_size = item.size
  1846. item_chunks_size = item.get_size(from_chunks=True)
  1847. if item_size != item_chunks_size:
  1848. # just warn, but keep the inconsistency, so that borg extract can warn about it.
  1849. logger.warning(
  1850. "{}: {}: size inconsistency detected: size {}, chunks size {}".format(
  1851. archive_name, item.path, item_size, item_chunks_size
  1852. )
  1853. )
  1854. def robust_iterator(archive):
  1855. """Iterates through all archive items
  1856. Missing item chunks will be skipped and the msgpack stream will be restarted
  1857. """
  1858. item_keys = self.manifest.item_keys
  1859. required_item_keys = REQUIRED_ITEM_KEYS
  1860. unpacker = RobustUnpacker(
  1861. lambda item: isinstance(item, StableDict) and "path" in item, self.manifest.item_keys
  1862. )
  1863. _state = 0
  1864. def missing_chunk_detector(chunk_id):
  1865. nonlocal _state
  1866. if _state % 2 != int(chunk_id not in self.chunks):
  1867. _state += 1
  1868. return _state
  1869. def report(msg, chunk_id, chunk_no):
  1870. cid = bin_to_hex(chunk_id)
  1871. msg += " [chunk: %06d_%s]" % (chunk_no, cid) # see "debug dump-archive-items"
  1872. self.error_found = True
  1873. logger.error(msg)
  1874. def list_keys_safe(keys):
  1875. return ", ".join(k.decode(errors="replace") if isinstance(k, bytes) else str(k) for k in keys)
  1876. def valid_item(obj):
  1877. if not isinstance(obj, StableDict):
  1878. return False, "not a dictionary"
  1879. keys = set(obj)
  1880. if not required_item_keys.issubset(keys):
  1881. return False, "missing required keys: " + list_keys_safe(required_item_keys - keys)
  1882. if not keys.issubset(item_keys):
  1883. return False, "invalid keys: " + list_keys_safe(keys - item_keys)
  1884. return True, ""
  1885. i = 0
  1886. archive_items = archive_get_items(archive, repo_objs=self.repo_objs, repository=repository)
  1887. for state, items in groupby(archive_items, missing_chunk_detector):
  1888. items = list(items)
  1889. if state % 2:
  1890. for chunk_id in items:
  1891. report("item metadata chunk missing", chunk_id, i)
  1892. i += 1
  1893. continue
  1894. if state > 0:
  1895. unpacker.resync()
  1896. for chunk_id, cdata in zip(items, repository.get_many(items)):
  1897. try:
  1898. _, data = self.repo_objs.parse(chunk_id, cdata)
  1899. unpacker.feed(data)
  1900. for item in unpacker:
  1901. valid, reason = valid_item(item)
  1902. if valid:
  1903. yield Item(internal_dict=item)
  1904. else:
  1905. report(
  1906. "Did not get expected metadata dict when unpacking item metadata (%s)" % reason,
  1907. chunk_id,
  1908. i,
  1909. )
  1910. except IntegrityError as integrity_error:
  1911. # repo_objs.parse() detected integrity issues.
  1912. # maybe the repo gave us a valid cdata, but not for the chunk_id we wanted.
  1913. # or the authentication of cdata failed, meaning the encrypted data was corrupted.
  1914. report(str(integrity_error), chunk_id, i)
  1915. except msgpack.UnpackException:
  1916. report("Unpacker crashed while unpacking item metadata, trying to resync...", chunk_id, i)
  1917. unpacker.resync()
  1918. except Exception:
  1919. report("Exception while decrypting or unpacking item metadata", chunk_id, i)
  1920. raise
  1921. i += 1
  1922. sort_by = sort_by.split(",")
  1923. if any((first, last, match)):
  1924. archive_infos = self.manifest.archives.list(sort_by=sort_by, match=match, first=first, last=last)
  1925. if match and not archive_infos:
  1926. logger.warning("--match-archives %s does not match any archives", match)
  1927. if first and len(archive_infos) < first:
  1928. logger.warning("--first %d archives: only found %d archives", first, len(archive_infos))
  1929. if last and len(archive_infos) < last:
  1930. logger.warning("--last %d archives: only found %d archives", last, len(archive_infos))
  1931. else:
  1932. archive_infos = self.manifest.archives.list(sort_by=sort_by)
  1933. num_archives = len(archive_infos)
  1934. pi = ProgressIndicatorPercent(
  1935. total=num_archives, msg="Checking archives %3.1f%%", step=0.1, msgid="check.rebuild_refcounts"
  1936. )
  1937. with cache_if_remote(self.repository) as repository:
  1938. for i, info in enumerate(archive_infos):
  1939. pi.show(i)
  1940. logger.info(f"Analyzing archive {info.name} ({i + 1}/{num_archives})")
  1941. archive_id = info.id
  1942. if archive_id not in self.chunks:
  1943. logger.error("Archive metadata block %s is missing!", bin_to_hex(archive_id))
  1944. self.error_found = True
  1945. del self.manifest.archives[info.name]
  1946. continue
  1947. mark_as_possibly_superseded(archive_id)
  1948. cdata = self.repository.get(archive_id)
  1949. try:
  1950. _, data = self.repo_objs.parse(archive_id, cdata)
  1951. except IntegrityError as integrity_error:
  1952. logger.error("Archive metadata block %s is corrupted: %s", bin_to_hex(archive_id), integrity_error)
  1953. self.error_found = True
  1954. del self.manifest.archives[info.name]
  1955. continue
  1956. archive = ArchiveItem(internal_dict=msgpack.unpackb(data))
  1957. if archive.version != 2:
  1958. raise Exception("Unknown archive metadata version")
  1959. items_buffer = ChunkBuffer(self.key)
  1960. items_buffer.write_chunk = add_callback
  1961. for item in robust_iterator(archive):
  1962. if "chunks" in item:
  1963. verify_file_chunks(info.name, item)
  1964. items_buffer.add(item)
  1965. items_buffer.flush(flush=True)
  1966. for previous_item_id in archive_get_items(
  1967. archive, repo_objs=self.repo_objs, repository=self.repository
  1968. ):
  1969. mark_as_possibly_superseded(previous_item_id)
  1970. for previous_item_ptr in archive.item_ptrs:
  1971. mark_as_possibly_superseded(previous_item_ptr)
  1972. archive.item_ptrs = archive_put_items(
  1973. items_buffer.chunks, repo_objs=self.repo_objs, add_reference=add_reference
  1974. )
  1975. data = msgpack.packb(archive.as_dict())
  1976. new_archive_id = self.key.id_hash(data)
  1977. cdata = self.repo_objs.format(new_archive_id, {}, data)
  1978. add_reference(new_archive_id, len(data), cdata)
  1979. self.manifest.archives[info.name] = (new_archive_id, info.ts)
  1980. pi.finish()
  1981. def orphan_chunks_check(self):
  1982. if self.check_all:
  1983. unused = {id_ for id_, entry in self.chunks.iteritems() if entry.refcount == 0}
  1984. orphaned = unused - self.possibly_superseded
  1985. if orphaned:
  1986. logger.error(f"{len(orphaned)} orphaned objects found!")
  1987. self.error_found = True
  1988. if self.repair and unused:
  1989. logger.info(
  1990. "Deleting %d orphaned and %d superseded objects..." % (len(orphaned), len(self.possibly_superseded))
  1991. )
  1992. for id_ in unused:
  1993. self.repository.delete(id_)
  1994. logger.info("Finished deleting orphaned/superseded objects.")
  1995. else:
  1996. logger.info("Orphaned objects check skipped (needs all archives checked).")
  1997. def finish(self):
  1998. if self.repair:
  1999. logger.info("Writing Manifest.")
  2000. self.manifest.write()
  2001. logger.info("Committing repo.")
  2002. self.repository.commit(compact=False)
  2003. class ArchiveRecreater:
  2004. class Interrupted(Exception):
  2005. def __init__(self, metadata=None):
  2006. self.metadata = metadata or {}
  2007. @staticmethod
  2008. def is_temporary_archive(archive_name):
  2009. return archive_name.endswith(".recreate")
  2010. def __init__(
  2011. self,
  2012. manifest,
  2013. cache,
  2014. matcher,
  2015. exclude_caches=False,
  2016. exclude_if_present=None,
  2017. keep_exclude_tags=False,
  2018. chunker_params=None,
  2019. compression=None,
  2020. recompress=False,
  2021. always_recompress=False,
  2022. dry_run=False,
  2023. stats=False,
  2024. progress=False,
  2025. file_status_printer=None,
  2026. timestamp=None,
  2027. checkpoint_interval=1800,
  2028. ):
  2029. self.manifest = manifest
  2030. self.repository = manifest.repository
  2031. self.key = manifest.key
  2032. self.repo_objs = manifest.repo_objs
  2033. self.cache = cache
  2034. self.matcher = matcher
  2035. self.exclude_caches = exclude_caches
  2036. self.exclude_if_present = exclude_if_present or []
  2037. self.keep_exclude_tags = keep_exclude_tags
  2038. self.rechunkify = chunker_params is not None
  2039. if self.rechunkify:
  2040. logger.debug("Rechunking archives to %s", chunker_params)
  2041. self.chunker_params = chunker_params or CHUNKER_PARAMS
  2042. self.recompress = recompress
  2043. self.always_recompress = always_recompress
  2044. self.compression = compression or CompressionSpec("none")
  2045. self.seen_chunks = set()
  2046. self.timestamp = timestamp
  2047. self.dry_run = dry_run
  2048. self.stats = stats
  2049. self.progress = progress
  2050. self.print_file_status = file_status_printer or (lambda *args: None)
  2051. self.checkpoint_interval = None if dry_run else checkpoint_interval
  2052. def recreate(self, archive_name, comment=None, target_name=None):
  2053. assert not self.is_temporary_archive(archive_name)
  2054. archive = self.open_archive(archive_name)
  2055. target = self.create_target(archive, target_name)
  2056. if self.exclude_if_present or self.exclude_caches:
  2057. self.matcher_add_tagged_dirs(archive)
  2058. if self.matcher.empty() and not self.recompress and not target.recreate_rechunkify and comment is None:
  2059. return False
  2060. self.process_items(archive, target)
  2061. replace_original = target_name is None
  2062. self.save(archive, target, comment, replace_original=replace_original)
  2063. return True
  2064. def process_items(self, archive, target):
  2065. matcher = self.matcher
  2066. for item in archive.iter_items():
  2067. if not matcher.match(item.path):
  2068. self.print_file_status("x", item.path)
  2069. continue
  2070. if self.dry_run:
  2071. self.print_file_status("-", item.path)
  2072. else:
  2073. self.process_item(archive, target, item)
  2074. if self.progress:
  2075. target.stats.show_progress(final=True)
  2076. def process_item(self, archive, target, item):
  2077. status = file_status(item.mode)
  2078. if "chunks" in item:
  2079. self.print_file_status(status, item.path)
  2080. status = None
  2081. self.process_chunks(archive, target, item)
  2082. target.stats.nfiles += 1
  2083. target.add_item(item, stats=target.stats)
  2084. self.print_file_status(status, item.path)
  2085. def process_chunks(self, archive, target, item):
  2086. if not self.recompress and not target.recreate_rechunkify:
  2087. for chunk_id, size in item.chunks:
  2088. self.cache.chunk_incref(chunk_id, target.stats)
  2089. return item.chunks
  2090. chunk_iterator = self.iter_chunks(archive, target, list(item.chunks))
  2091. chunk_processor = partial(self.chunk_processor, target)
  2092. target.process_file_chunks(item, self.cache, target.stats, self.progress, chunk_iterator, chunk_processor)
  2093. def chunk_processor(self, target, chunk):
  2094. chunk_id, data = cached_hash(chunk, self.key.id_hash)
  2095. if chunk_id in self.seen_chunks:
  2096. return self.cache.chunk_incref(chunk_id, target.stats)
  2097. overwrite = self.recompress
  2098. if self.recompress and not self.always_recompress and chunk_id in self.cache.chunks:
  2099. # Check if this chunk is already compressed the way we want it
  2100. old_meta = self.repo_objs.parse_meta(chunk_id, self.repository.get(chunk_id, read_data=False))
  2101. compr_hdr = bytes((old_meta["ctype"], old_meta["clevel"]))
  2102. compressor_cls, level = Compressor.detect(compr_hdr)
  2103. if (
  2104. compressor_cls.name == self.repo_objs.compressor.decide({}, data).name
  2105. and level == self.repo_objs.compressor.level
  2106. ):
  2107. # Stored chunk has the same compression method and level as we wanted
  2108. overwrite = False
  2109. chunk_entry = self.cache.add_chunk(chunk_id, {}, data, stats=target.stats, overwrite=overwrite, wait=False)
  2110. self.cache.repository.async_response(wait=False)
  2111. self.seen_chunks.add(chunk_entry.id)
  2112. return chunk_entry
  2113. def iter_chunks(self, archive, target, chunks):
  2114. chunk_iterator = archive.pipeline.fetch_many([chunk_id for chunk_id, _ in chunks])
  2115. if target.recreate_rechunkify:
  2116. # The target.chunker will read the file contents through ChunkIteratorFileWrapper chunk-by-chunk
  2117. # (does not load the entire file into memory)
  2118. file = ChunkIteratorFileWrapper(chunk_iterator)
  2119. yield from target.chunker.chunkify(file)
  2120. else:
  2121. for chunk in chunk_iterator:
  2122. yield Chunk(chunk, size=len(chunk), allocation=CH_DATA)
  2123. def save(self, archive, target, comment=None, replace_original=True):
  2124. if self.dry_run:
  2125. return
  2126. if comment is None:
  2127. comment = archive.metadata.get("comment", "")
  2128. # Keep for the statistics if necessary
  2129. if self.stats:
  2130. _start = target.start
  2131. if self.timestamp is None:
  2132. additional_metadata = {
  2133. "time": archive.metadata.time,
  2134. "time_end": archive.metadata.get("time_end") or archive.metadata.time,
  2135. "cmdline": archive.metadata.cmdline,
  2136. # but also remember recreate metadata:
  2137. "recreate_cmdline": sys.argv,
  2138. }
  2139. else:
  2140. additional_metadata = {
  2141. "cmdline": archive.metadata.cmdline,
  2142. # but also remember recreate metadata:
  2143. "recreate_cmdline": sys.argv,
  2144. }
  2145. target.save(comment=comment, timestamp=self.timestamp, additional_metadata=additional_metadata)
  2146. if replace_original:
  2147. archive.delete(Statistics(), progress=self.progress)
  2148. target.rename(archive.name)
  2149. if self.stats:
  2150. target.start = _start
  2151. target.end = archive_ts_now()
  2152. log_multi(str(target), str(target.stats))
  2153. def matcher_add_tagged_dirs(self, archive):
  2154. """Add excludes to the matcher created by exclude_cache and exclude_if_present."""
  2155. def exclude(dir, tag_item):
  2156. if self.keep_exclude_tags:
  2157. tag_files.append(PathPrefixPattern(tag_item.path, recurse_dir=False))
  2158. tagged_dirs.append(FnmatchPattern(dir + "/", recurse_dir=False))
  2159. else:
  2160. tagged_dirs.append(PathPrefixPattern(dir, recurse_dir=False))
  2161. matcher = self.matcher
  2162. tag_files = []
  2163. tagged_dirs = []
  2164. for item in archive.iter_items(
  2165. filter=lambda item: os.path.basename(item.path) == CACHE_TAG_NAME or matcher.match(item.path)
  2166. ):
  2167. dir, tag_file = os.path.split(item.path)
  2168. if tag_file in self.exclude_if_present:
  2169. exclude(dir, item)
  2170. elif self.exclude_caches and tag_file == CACHE_TAG_NAME and stat.S_ISREG(item.mode):
  2171. file = open_item(archive, item)
  2172. if file.read(len(CACHE_TAG_CONTENTS)) == CACHE_TAG_CONTENTS:
  2173. exclude(dir, item)
  2174. matcher.add(tag_files, IECommand.Include)
  2175. matcher.add(tagged_dirs, IECommand.ExcludeNoRecurse)
  2176. def create_target(self, archive, target_name=None):
  2177. """Create target archive."""
  2178. target_name = target_name or archive.name + ".recreate"
  2179. target = self.create_target_archive(target_name)
  2180. # If the archives use the same chunker params, then don't rechunkify
  2181. source_chunker_params = tuple(archive.metadata.get("chunker_params", []))
  2182. if len(source_chunker_params) == 4 and isinstance(source_chunker_params[0], int):
  2183. # this is a borg < 1.2 chunker_params tuple, no chunker algo specified, but we only had buzhash:
  2184. source_chunker_params = (CH_BUZHASH,) + source_chunker_params
  2185. target.recreate_rechunkify = self.rechunkify and source_chunker_params != target.chunker_params
  2186. if target.recreate_rechunkify:
  2187. logger.debug(
  2188. "Rechunking archive from %s to %s", source_chunker_params or "(unknown)", target.chunker_params
  2189. )
  2190. target.process_file_chunks = ChunksProcessor(
  2191. cache=self.cache,
  2192. key=self.key,
  2193. add_item=target.add_item,
  2194. write_checkpoint=target.write_checkpoint,
  2195. checkpoint_interval=self.checkpoint_interval,
  2196. rechunkify=target.recreate_rechunkify,
  2197. ).process_file_chunks
  2198. target.chunker = get_chunker(*target.chunker_params, seed=self.key.chunk_seed, sparse=False)
  2199. return target
  2200. def create_target_archive(self, name):
  2201. target = Archive(
  2202. self.manifest,
  2203. name,
  2204. create=True,
  2205. progress=self.progress,
  2206. chunker_params=self.chunker_params,
  2207. cache=self.cache,
  2208. checkpoint_interval=self.checkpoint_interval,
  2209. )
  2210. return target
  2211. def open_archive(self, name, **kwargs):
  2212. return Archive(self.manifest, name, cache=self.cache, **kwargs)