archive.py 101 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366
  1. import base64
  2. import errno
  3. import json
  4. import os
  5. import stat
  6. import sys
  7. import time
  8. from collections import OrderedDict, defaultdict
  9. from contextlib import contextmanager
  10. from datetime import timedelta
  11. from functools import partial
  12. from getpass import getuser
  13. from io import BytesIO
  14. from itertools import groupby, zip_longest
  15. from collections.abc import Iterator
  16. from shutil import get_terminal_size
  17. from .platformflags import is_win32
  18. from .logger import create_logger
  19. logger = create_logger()
  20. from . import xattr
  21. from .chunkers import get_chunker, Chunk
  22. from .cache import ChunkListEntry, build_chunkindex_from_repo, delete_chunkindex_cache
  23. from .crypto.key import key_factory, UnsupportedPayloadError
  24. from .compress import CompressionSpec
  25. from .constants import * # NOQA
  26. from .crypto.low_level import IntegrityError as IntegrityErrorBase
  27. from .helpers import BackupError, BackupRaceConditionError, BackupItemExcluded
  28. from .helpers import BackupOSError, BackupPermissionError, BackupFileNotFoundError, BackupIOError
  29. from .hashindex import ChunkIndex, ChunkIndexEntry
  30. from .helpers import HardLinkManager
  31. from .helpers import ChunkIteratorFileWrapper, open_item
  32. from .helpers import Error, IntegrityError, set_ec
  33. from .platform import uid2user, user2uid, gid2group, group2gid, get_birthtime_ns
  34. from .helpers import parse_timestamp, archive_ts_now
  35. from .helpers import OutputTimestamp, format_timedelta, format_file_size, file_status, FileSize
  36. from .helpers import safe_encode, make_path_safe, remove_surrogates, text_to_json, join_cmd, remove_dotdot_prefixes
  37. from .helpers import StableDict
  38. from .helpers import bin_to_hex
  39. from .helpers import safe_ns
  40. from .helpers import ellipsis_truncate, ProgressIndicatorPercent, log_multi
  41. from .helpers import os_open, flags_normal, flags_dir
  42. from .helpers import os_stat
  43. from .helpers import msgpack
  44. from .helpers.lrucache import LRUCache
  45. from .manifest import Manifest
  46. from .patterns import PathPrefixPattern, FnmatchPattern, IECommand
  47. from .item import Item, ArchiveItem, ItemDiff
  48. from .platform import acl_get, acl_set, set_flags, get_flags, swidth, hostname
  49. from .remote import RemoteRepository, cache_if_remote
  50. from .repository import Repository, NoManifestError
  51. from .repoobj import RepoObj
  52. has_link = hasattr(os, "link")
  53. class Statistics:
  54. def __init__(self, output_json=False, iec=False):
  55. self.output_json = output_json
  56. self.iec = iec
  57. self.osize = self.usize = self.nfiles = 0
  58. self.last_progress = 0 # timestamp when last progress was shown
  59. self.files_stats = defaultdict(int)
  60. self.chunking_time = 0.0
  61. self.hashing_time = 0.0
  62. self.rx_bytes = 0
  63. self.tx_bytes = 0
  64. def update(self, size, unique):
  65. self.osize += size
  66. if unique:
  67. self.usize += size
  68. def __add__(self, other):
  69. if not isinstance(other, Statistics):
  70. raise TypeError("can only add Statistics objects")
  71. stats = Statistics(self.output_json, self.iec)
  72. stats.osize = self.osize + other.osize
  73. stats.usize = self.usize + other.usize
  74. stats.nfiles = self.nfiles + other.nfiles
  75. stats.chunking_time = self.chunking_time + other.chunking_time
  76. stats.hashing_time = self.hashing_time + other.hashing_time
  77. st1, st2 = self.files_stats, other.files_stats
  78. stats.files_stats = defaultdict(int, {key: (st1[key] + st2[key]) for key in st1.keys() | st2.keys()})
  79. return stats
  80. def __str__(self):
  81. hashing_time = format_timedelta(timedelta(seconds=self.hashing_time))
  82. chunking_time = format_timedelta(timedelta(seconds=self.chunking_time))
  83. return """\
  84. Number of files: {stats.nfiles}
  85. Original size: {stats.osize_fmt}
  86. Deduplicated size: {stats.usize_fmt}
  87. Time spent in hashing: {hashing_time}
  88. Time spent in chunking: {chunking_time}
  89. Added files: {added_files}
  90. Unchanged files: {unchanged_files}
  91. Modified files: {modified_files}
  92. Error files: {error_files}
  93. Files changed while reading: {files_changed_while_reading}
  94. Bytes read from remote: {stats.rx_bytes}
  95. Bytes sent to remote: {stats.tx_bytes}
  96. """.format(
  97. stats=self,
  98. hashing_time=hashing_time,
  99. chunking_time=chunking_time,
  100. added_files=self.files_stats["A"],
  101. unchanged_files=self.files_stats["U"],
  102. modified_files=self.files_stats["M"],
  103. error_files=self.files_stats["E"],
  104. files_changed_while_reading=self.files_stats["C"],
  105. )
  106. def __repr__(self):
  107. return "<{cls} object at {hash:#x} ({self.osize}, {self.usize})>".format(
  108. cls=type(self).__name__, hash=id(self), self=self
  109. )
  110. def as_dict(self):
  111. return {
  112. "original_size": FileSize(self.osize, iec=self.iec),
  113. "nfiles": self.nfiles,
  114. "hashing_time": self.hashing_time,
  115. "chunking_time": self.chunking_time,
  116. "files_stats": self.files_stats,
  117. }
  118. def as_raw_dict(self):
  119. return {"size": self.osize, "nfiles": self.nfiles}
  120. @classmethod
  121. def from_raw_dict(cls, **kw):
  122. self = cls()
  123. self.osize = kw["size"]
  124. self.nfiles = kw["nfiles"]
  125. return self
  126. @property
  127. def osize_fmt(self):
  128. return format_file_size(self.osize, iec=self.iec)
  129. @property
  130. def usize_fmt(self):
  131. return format_file_size(self.usize, iec=self.iec)
  132. def show_progress(self, item=None, final=False, stream=None, dt=None):
  133. now = time.monotonic()
  134. if dt is None or now - self.last_progress > dt:
  135. self.last_progress = now
  136. if self.output_json:
  137. if not final:
  138. data = self.as_dict()
  139. if item:
  140. data.update(text_to_json("path", item.path))
  141. else:
  142. data = {}
  143. data.update({"time": time.time(), "type": "archive_progress", "finished": final})
  144. msg = json.dumps(data)
  145. end = "\n"
  146. else:
  147. columns, lines = get_terminal_size()
  148. if not final:
  149. msg = "{0.osize_fmt} O {0.usize_fmt} U {0.nfiles} N ".format(self)
  150. path = remove_surrogates(item.path) if item else ""
  151. space = columns - swidth(msg)
  152. if space < 12:
  153. msg = ""
  154. space = columns - swidth(msg)
  155. if space >= 8:
  156. msg += ellipsis_truncate(path, space)
  157. else:
  158. msg = " " * columns
  159. end = "\r"
  160. print(msg, end=end, file=stream or sys.stderr, flush=True)
  161. def is_special(mode):
  162. # file types that get special treatment in --read-special mode
  163. return stat.S_ISBLK(mode) or stat.S_ISCHR(mode) or stat.S_ISFIFO(mode)
  164. class BackupIO:
  165. op = ""
  166. def __call__(self, op=""):
  167. self.op = op
  168. return self
  169. def __enter__(self):
  170. pass
  171. def __exit__(self, exc_type, exc_val, exc_tb):
  172. if exc_type and issubclass(exc_type, OSError):
  173. E_MAP = {
  174. errno.EPERM: BackupPermissionError,
  175. errno.EISDIR: BackupPermissionError,
  176. errno.EACCES: BackupPermissionError,
  177. errno.EBUSY: BackupPermissionError,
  178. errno.ENOENT: BackupFileNotFoundError,
  179. errno.EIO: BackupIOError,
  180. }
  181. e_cls = E_MAP.get(exc_val.errno, BackupOSError)
  182. raise e_cls(self.op, exc_val) from exc_val
  183. backup_io = BackupIO()
  184. def backup_io_iter(iterator):
  185. backup_io.op = "read"
  186. while True:
  187. with backup_io:
  188. try:
  189. item = next(iterator)
  190. except StopIteration:
  191. return
  192. yield item
  193. def stat_update_check(st_old, st_curr):
  194. """
  195. this checks for some race conditions between the first filename-based stat()
  196. we did before dispatching to the (hopefully correct) file type backup handler
  197. and the (hopefully) fd-based fstat() we did in the handler.
  198. if there is a problematic difference (e.g. file type changed), we rather
  199. skip the file than being tricked into a security problem.
  200. such races should only happen if:
  201. - we are backing up a live filesystem (no snapshot, not inactive)
  202. - if files change due to normal fs activity at an unfortunate time
  203. - if somebody is doing an attack against us
  204. """
  205. # assuming that a file type change implicates a different inode change AND that inode numbers
  206. # are not duplicate in a short timeframe, this check is redundant and solved by the ino check:
  207. if stat.S_IFMT(st_old.st_mode) != stat.S_IFMT(st_curr.st_mode):
  208. # in this case, we dispatched to wrong handler - abort
  209. raise BackupRaceConditionError("file type changed (race condition), skipping file")
  210. if st_old.st_ino != st_curr.st_ino:
  211. # in this case, the hardlinks-related code in create_helper has the wrong inode - abort!
  212. raise BackupRaceConditionError("file inode changed (race condition), skipping file")
  213. # looks ok, we are still dealing with the same thing - return current stat:
  214. return st_curr
  215. @contextmanager
  216. def OsOpen(*, flags, path=None, parent_fd=None, name=None, noatime=False, op="open"):
  217. with backup_io(op):
  218. fd = os_open(path=path, parent_fd=parent_fd, name=name, flags=flags, noatime=noatime)
  219. try:
  220. yield fd
  221. finally:
  222. # On windows fd is None for directories.
  223. if fd is not None:
  224. os.close(fd)
  225. class DownloadPipeline:
  226. def __init__(self, repository, repo_objs):
  227. self.repository = repository
  228. self.repo_objs = repo_objs
  229. self.hlids_preloaded = None
  230. def unpack_many(self, ids, *, filter=None):
  231. """
  232. Return iterator of items.
  233. *ids* is a chunk ID list of an item content data stream.
  234. *filter* is an optional callable to decide whether an item will be yielded, default: yield all items.
  235. """
  236. self.hlids_preloaded = set()
  237. unpacker = msgpack.Unpacker(use_list=False)
  238. for data in self.fetch_many(ids, ro_type=ROBJ_ARCHIVE_STREAM, replacement_chunk=False):
  239. if data is None:
  240. continue # archive stream chunk missing
  241. unpacker.feed(data)
  242. for _item in unpacker:
  243. item = Item(internal_dict=_item)
  244. if filter is None or filter(item):
  245. if "chunks" in item:
  246. item.chunks = [ChunkListEntry(*e) for e in item.chunks]
  247. if "chunks_healthy" in item: # legacy
  248. item.chunks_healthy = [ChunkListEntry(*e) for e in item.chunks_healthy]
  249. yield item
  250. def preload_item_chunks(self, item, optimize_hardlinks=False):
  251. """
  252. Preloads the content data chunks of an item (if any).
  253. optimize_hardlinks can be set to True if item chunks only need to be preloaded for
  254. 1st hardlink, but not for any further hardlink to same inode / with same hlid.
  255. Returns True if chunks were preloaded.
  256. Warning: if data chunks are preloaded then all data chunks have to be retrieved,
  257. otherwise preloaded chunks will accumulate in RemoteRepository and create a memory leak.
  258. """
  259. preload_chunks = False
  260. if "chunks" in item:
  261. if optimize_hardlinks:
  262. hlid = item.get("hlid", None)
  263. if hlid is None:
  264. preload_chunks = True
  265. elif hlid in self.hlids_preloaded:
  266. preload_chunks = False
  267. else:
  268. # not having the hardlink's chunks already preloaded for other hardlink to same inode
  269. preload_chunks = True
  270. self.hlids_preloaded.add(hlid)
  271. else:
  272. preload_chunks = True
  273. if preload_chunks:
  274. self.repository.preload([c.id for c in item.chunks])
  275. return preload_chunks
  276. def fetch_many(self, chunks, is_preloaded=False, ro_type=None, replacement_chunk=True):
  277. assert ro_type is not None
  278. ids = []
  279. sizes = []
  280. if all(isinstance(chunk, ChunkListEntry) for chunk in chunks):
  281. for chunk in chunks:
  282. ids.append(chunk.id)
  283. sizes.append(chunk.size)
  284. elif all(isinstance(chunk, bytes) for chunk in chunks):
  285. ids = list(chunks)
  286. sizes = [None] * len(ids)
  287. else:
  288. raise TypeError(f"unsupported or mixed element types: {chunks}")
  289. for id, size, cdata in zip(
  290. ids, sizes, self.repository.get_many(ids, is_preloaded=is_preloaded, raise_missing=False)
  291. ):
  292. if cdata is None:
  293. if replacement_chunk and size is not None:
  294. logger.error(f"repository object {bin_to_hex(id)} missing, returning {size} zero bytes.")
  295. data = zeros[:size] # return an all-zero replacement chunk of correct size
  296. else:
  297. logger.error(f"repository object {bin_to_hex(id)} missing, returning None.")
  298. data = None
  299. else:
  300. _, data = self.repo_objs.parse(id, cdata, ro_type=ro_type)
  301. assert size is None or len(data) == size
  302. yield data
  303. class ChunkBuffer:
  304. BUFFER_SIZE = 8 * 1024 * 1024
  305. def __init__(self, key, chunker_params=ITEMS_CHUNKER_PARAMS):
  306. self.buffer = BytesIO()
  307. self.packer = msgpack.Packer()
  308. self.chunks = []
  309. self.key = key
  310. self.chunker = get_chunker(*chunker_params, key=self.key, sparse=False)
  311. self.saved_chunks_len = None
  312. def add(self, item):
  313. self.buffer.write(self.packer.pack(item.as_dict()))
  314. if self.is_full():
  315. self.flush()
  316. def write_chunk(self, chunk):
  317. raise NotImplementedError
  318. def flush(self, flush=False):
  319. if self.buffer.tell() == 0:
  320. return
  321. self.buffer.seek(0)
  322. # The chunker returns a memoryview to its internal buffer,
  323. # thus a copy is needed before resuming the chunker iterator.
  324. # the metadata stream may produce all-zero chunks, so deal
  325. # with CH_ALLOC (and CH_HOLE, for completeness) here.
  326. chunks = []
  327. for chunk in self.chunker.chunkify(self.buffer):
  328. alloc = chunk.meta["allocation"]
  329. if alloc == CH_DATA:
  330. data = bytes(chunk.data)
  331. elif alloc in (CH_ALLOC, CH_HOLE):
  332. data = zeros[: chunk.meta["size"]]
  333. else:
  334. raise ValueError("chunk allocation has unsupported value of %r" % alloc)
  335. chunks.append(data)
  336. self.buffer.seek(0)
  337. self.buffer.truncate(0)
  338. # Leave the last partial chunk in the buffer unless flush is True
  339. end = None if flush or len(chunks) == 1 else -1
  340. for chunk in chunks[:end]:
  341. self.chunks.append(self.write_chunk(chunk))
  342. if end == -1:
  343. self.buffer.write(chunks[-1])
  344. def is_full(self):
  345. return self.buffer.tell() > self.BUFFER_SIZE
  346. class CacheChunkBuffer(ChunkBuffer):
  347. def __init__(self, cache, key, stats, chunker_params=ITEMS_CHUNKER_PARAMS):
  348. super().__init__(key, chunker_params)
  349. self.cache = cache
  350. self.stats = stats
  351. def write_chunk(self, chunk):
  352. id_, _ = self.cache.add_chunk(
  353. self.key.id_hash(chunk), {}, chunk, stats=self.stats, wait=False, ro_type=ROBJ_ARCHIVE_STREAM
  354. )
  355. logger.debug(f"writing item metadata stream chunk {bin_to_hex(id_)}")
  356. self.cache.repository.async_response(wait=False)
  357. return id_
  358. def get_item_uid_gid(item, *, numeric, uid_forced=None, gid_forced=None, uid_default=0, gid_default=0):
  359. if uid_forced is not None:
  360. uid = uid_forced
  361. else:
  362. uid = None if numeric else user2uid(item.get("user"))
  363. uid = item.get("uid") if uid is None else uid
  364. if uid is None or uid < 0:
  365. uid = uid_default
  366. if gid_forced is not None:
  367. gid = gid_forced
  368. else:
  369. gid = None if numeric else group2gid(item.get("group"))
  370. gid = item.get("gid") if gid is None else gid
  371. if gid is None or gid < 0:
  372. gid = gid_default
  373. return uid, gid
  374. def archive_get_items(metadata, *, repo_objs, repository):
  375. if "item_ptrs" in metadata: # looks like a v2+ archive
  376. assert "items" not in metadata
  377. items = []
  378. for id, cdata in zip(metadata.item_ptrs, repository.get_many(metadata.item_ptrs)):
  379. _, data = repo_objs.parse(id, cdata, ro_type=ROBJ_ARCHIVE_CHUNKIDS)
  380. ids = msgpack.unpackb(data)
  381. items.extend(ids)
  382. return items
  383. if "items" in metadata: # legacy, v1 archive
  384. assert "item_ptrs" not in metadata
  385. return metadata.items
  386. def archive_put_items(chunk_ids, *, repo_objs, cache=None, stats=None, add_reference=None):
  387. """gets a (potentially large) list of archive metadata stream chunk ids and writes them to repo objects"""
  388. item_ptrs = []
  389. for i in range(0, len(chunk_ids), IDS_PER_CHUNK):
  390. data = msgpack.packb(chunk_ids[i : i + IDS_PER_CHUNK])
  391. id = repo_objs.id_hash(data)
  392. logger.debug(f"writing item_ptrs chunk {bin_to_hex(id)}")
  393. if cache is not None and stats is not None:
  394. cache.add_chunk(id, {}, data, stats=stats, ro_type=ROBJ_ARCHIVE_CHUNKIDS)
  395. elif add_reference is not None:
  396. cdata = repo_objs.format(id, {}, data, ro_type=ROBJ_ARCHIVE_CHUNKIDS)
  397. add_reference(id, len(data), cdata)
  398. else:
  399. raise NotImplementedError
  400. item_ptrs.append(id)
  401. return item_ptrs
  402. class Archive:
  403. class AlreadyExists(Error):
  404. """Archive {} already exists"""
  405. exit_mcode = 30
  406. class DoesNotExist(Error):
  407. """Archive {} does not exist"""
  408. exit_mcode = 31
  409. class IncompatibleFilesystemEncodingError(Error):
  410. """Failed to encode filename "{}" into file system encoding "{}". Consider configuring the LANG environment variable."""
  411. exit_mcode = 32
  412. def __init__(
  413. self,
  414. manifest,
  415. name,
  416. *,
  417. cache=None,
  418. create=False,
  419. numeric_ids=False,
  420. noatime=False,
  421. noctime=False,
  422. noflags=False,
  423. noacls=False,
  424. noxattrs=False,
  425. progress=False,
  426. chunker_params=CHUNKER_PARAMS,
  427. start=None,
  428. start_monotonic=None,
  429. end=None,
  430. log_json=False,
  431. iec=False,
  432. deleted=False,
  433. ):
  434. name_is_id = isinstance(name, bytes)
  435. if not name_is_id:
  436. assert len(name) <= 255
  437. self.cwd = os.getcwd()
  438. assert isinstance(manifest, Manifest)
  439. self.manifest = manifest
  440. self.key = manifest.repo_objs.key
  441. self.repo_objs = manifest.repo_objs
  442. self.repository = manifest.repository
  443. self.cache = cache
  444. self.stats = Statistics(output_json=log_json, iec=iec)
  445. self.iec = iec
  446. self.show_progress = progress
  447. self.name = name # overwritten later with name from archive metadata
  448. self.name_in_manifest = name # can differ from .name later (if borg check fixed duplicate archive names)
  449. self.comment = None
  450. self.tags = None
  451. self.numeric_ids = numeric_ids
  452. self.noatime = noatime
  453. self.noctime = noctime
  454. self.noflags = noflags
  455. self.noacls = noacls
  456. self.noxattrs = noxattrs
  457. assert (start is None) == (
  458. start_monotonic is None
  459. ), "Logic error: if start is given, start_monotonic must be given as well and vice versa."
  460. if start is None:
  461. start = archive_ts_now()
  462. start_monotonic = time.monotonic()
  463. self.chunker_params = chunker_params
  464. self.start = start
  465. self.start_monotonic = start_monotonic
  466. if end is None:
  467. end = archive_ts_now()
  468. self.end = end
  469. self.pipeline = DownloadPipeline(self.repository, self.repo_objs)
  470. self.create = create
  471. if self.create:
  472. self.items_buffer = CacheChunkBuffer(self.cache, self.key, self.stats)
  473. self.tags = set()
  474. else:
  475. if name_is_id:
  476. # we also go over the manifest here to avoid soft-deleted archives,
  477. # except if we explicitly request one via deleted=True.
  478. info = self.manifest.archives.get_by_id(name, deleted=deleted)
  479. else:
  480. info = self.manifest.archives.get(name)
  481. if info is None:
  482. raise self.DoesNotExist(name)
  483. self.load(info.id)
  484. def _load_meta(self, id):
  485. cdata = self.repository.get(id)
  486. _, data = self.repo_objs.parse(id, cdata, ro_type=ROBJ_ARCHIVE_META)
  487. archive = self.key.unpack_archive(data)
  488. metadata = ArchiveItem(internal_dict=archive)
  489. if metadata.version not in (1, 2): # legacy: still need to read v1 archives
  490. raise Exception("Unknown archive metadata version")
  491. # note: metadata.items must not get written to disk!
  492. metadata.items = archive_get_items(metadata, repo_objs=self.repo_objs, repository=self.repository)
  493. return metadata
  494. def load(self, id):
  495. self.id = id
  496. self.metadata = self._load_meta(self.id)
  497. self.name = self.metadata.name
  498. self.comment = self.metadata.get("comment", "")
  499. self.tags = set(self.metadata.get("tags", []))
  500. @property
  501. def ts(self):
  502. """Timestamp of archive creation (start) in UTC"""
  503. ts = self.metadata.time
  504. return parse_timestamp(ts)
  505. @property
  506. def ts_end(self):
  507. """Timestamp of archive creation (end) in UTC"""
  508. # fall back to time if there is no time_end present in metadata
  509. ts = self.metadata.get("time_end") or self.metadata.time
  510. return parse_timestamp(ts)
  511. @property
  512. def fpr(self):
  513. return bin_to_hex(self.id)
  514. @property
  515. def duration(self):
  516. return format_timedelta(self.end - self.start)
  517. @property
  518. def duration_from_meta(self):
  519. return format_timedelta(self.ts_end - self.ts)
  520. def info(self):
  521. if self.create:
  522. stats = self.stats
  523. start = self.start
  524. end = self.end
  525. else:
  526. stats = self.calc_stats(self.cache)
  527. start = self.ts
  528. end = self.ts_end
  529. info = {
  530. "name": self.name,
  531. "id": self.fpr,
  532. "start": OutputTimestamp(start),
  533. "end": OutputTimestamp(end),
  534. "duration": (end - start).total_seconds(),
  535. "stats": stats.as_dict(),
  536. }
  537. if self.create:
  538. info["command_line"] = join_cmd(sys.argv)
  539. else:
  540. info.update(
  541. {
  542. "command_line": self.metadata.command_line,
  543. "hostname": self.metadata.hostname,
  544. "username": self.metadata.username,
  545. "comment": self.metadata.get("comment", ""),
  546. "tags": sorted(self.tags),
  547. "chunker_params": self.metadata.get("chunker_params", ""),
  548. }
  549. )
  550. return info
  551. def __str__(self):
  552. return """\
  553. Repository: {location}
  554. Archive name: {0.name}
  555. Archive fingerprint: {0.fpr}
  556. Time (start): {start}
  557. Time (end): {end}
  558. Duration: {0.duration}
  559. """.format(
  560. self,
  561. start=OutputTimestamp(self.start),
  562. end=OutputTimestamp(self.end),
  563. location=self.repository._location.canonical_path(),
  564. )
  565. def __repr__(self):
  566. return "Archive(%r)" % self.name
  567. def item_filter(self, item, filter=None):
  568. return filter(item) if filter else True
  569. def iter_items(self, filter=None):
  570. yield from self.pipeline.unpack_many(self.metadata.items, filter=lambda item: self.item_filter(item, filter))
  571. def preload_item_chunks(self, item, optimize_hardlinks=False):
  572. """
  573. Preloads item content data chunks from the repository.
  574. Warning: if data chunks are preloaded then all data chunks have to be retrieved,
  575. otherwise preloaded chunks will accumulate in RemoteRepository and create a memory leak.
  576. """
  577. return self.pipeline.preload_item_chunks(item, optimize_hardlinks=optimize_hardlinks)
  578. def add_item(self, item, show_progress=True, stats=None):
  579. if show_progress and self.show_progress:
  580. if stats is None:
  581. stats = self.stats
  582. stats.show_progress(item=item, dt=0.2)
  583. self.items_buffer.add(item)
  584. def save(self, name=None, comment=None, timestamp=None, stats=None, additional_metadata=None):
  585. name = name or self.name
  586. self.items_buffer.flush(flush=True) # this adds the size of metadata stream chunks to stats.osize
  587. item_ptrs = archive_put_items(
  588. self.items_buffer.chunks, repo_objs=self.repo_objs, cache=self.cache, stats=self.stats
  589. ) # this adds the sizes of the item ptrs chunks to stats.osize
  590. duration = timedelta(seconds=time.monotonic() - self.start_monotonic)
  591. if timestamp is None:
  592. end = archive_ts_now()
  593. start = end - duration
  594. else:
  595. start = timestamp
  596. end = start + duration
  597. self.start = start
  598. self.end = end
  599. metadata = {
  600. "version": 2,
  601. "name": name,
  602. "comment": comment or "",
  603. "tags": list(sorted(self.tags)),
  604. "item_ptrs": item_ptrs, # see #1473
  605. "command_line": join_cmd(sys.argv),
  606. "hostname": hostname,
  607. "username": getuser(),
  608. "time": start.isoformat(timespec="microseconds"),
  609. "time_end": end.isoformat(timespec="microseconds"),
  610. "chunker_params": self.chunker_params,
  611. }
  612. # we always want to create archives with the addtl. metadata (nfiles, etc.),
  613. # because borg info relies on them. so, either use the given stats (from args)
  614. # or fall back to self.stats if it was not given.
  615. stats = stats or self.stats
  616. metadata.update({"size": stats.osize, "nfiles": stats.nfiles})
  617. metadata.update(additional_metadata or {})
  618. metadata = ArchiveItem(metadata)
  619. data = self.key.pack_metadata(metadata.as_dict())
  620. self.id = self.repo_objs.id_hash(data)
  621. try:
  622. self.cache.add_chunk(self.id, {}, data, stats=self.stats, ro_type=ROBJ_ARCHIVE_META)
  623. except IntegrityError as err:
  624. err_msg = str(err)
  625. # hack to avoid changing the RPC protocol by introducing new (more specific) exception class
  626. if "More than allowed put data" in err_msg:
  627. raise Error("%s - archive too big (issue #1473)!" % err_msg)
  628. else:
  629. raise
  630. while self.repository.async_response(wait=True) is not None:
  631. pass
  632. self.manifest.archives.create(name, self.id, metadata.time)
  633. self.manifest.write()
  634. return metadata
  635. def calc_stats(self, cache, want_unique=True):
  636. stats = Statistics(iec=self.iec)
  637. stats.usize = 0 # this is expensive to compute
  638. stats.nfiles = self.metadata.nfiles
  639. stats.osize = self.metadata.size
  640. return stats
  641. @contextmanager
  642. def extract_helper(self, item, path, hlm, *, dry_run=False):
  643. hardlink_set = False
  644. # Hard link?
  645. if "hlid" in item:
  646. link_target = hlm.retrieve(id=item.hlid)
  647. if link_target is not None and has_link:
  648. if not dry_run:
  649. # another hardlink to same inode (same hlid) was extracted previously, just link to it
  650. with backup_io("link"):
  651. os.link(link_target, path, follow_symlinks=False)
  652. hardlink_set = True
  653. yield hardlink_set
  654. if not hardlink_set:
  655. if "hlid" in item and has_link:
  656. # Update entry with extracted item path, so that following hardlinks don't extract twice.
  657. # We have hardlinking support, so we will hardlink not extract.
  658. hlm.remember(id=item.hlid, info=path)
  659. else:
  660. # Broken platform with no hardlinking support.
  661. # In this case, we *want* to extract twice, because there is no other way.
  662. pass
  663. def extract_item(
  664. self,
  665. item,
  666. *,
  667. restore_attrs=True,
  668. dry_run=False,
  669. stdout=False,
  670. sparse=False,
  671. hlm=None,
  672. pi=None,
  673. continue_extraction=False,
  674. ):
  675. """
  676. Extract archive item.
  677. :param item: the item to extract
  678. :param restore_attrs: restore file attributes
  679. :param dry_run: do not write any data
  680. :param stdout: write extracted data to stdout
  681. :param sparse: write sparse files (chunk-granularity, independent of the original being sparse)
  682. :param hlm: maps hlid to link_target for extracting subtrees with hardlinks correctly
  683. :param pi: ProgressIndicatorPercent (or similar) for file extraction progress (in bytes)
  684. :param continue_extraction: continue a previously interrupted extraction of same archive
  685. """
  686. def same_item(item, st):
  687. """is the archived item the same as the fs item at same path with stat st?"""
  688. if not stat.S_ISREG(st.st_mode):
  689. # we only "optimize" for regular files.
  690. # other file types are less frequent and have no content extraction we could "optimize away".
  691. return False
  692. if item.mode != st.st_mode or item.size != st.st_size:
  693. # the size check catches incomplete previous file extraction
  694. return False
  695. if item.get("mtime") != st.st_mtime_ns:
  696. # note: mtime is "extracted" late, after xattrs and ACLs, but before flags.
  697. return False
  698. # this is good enough for the intended use case:
  699. # continuing an extraction of same archive that initially started in an empty directory.
  700. # there is a very small risk that "bsdflags" of one file are wrong:
  701. # if a previous extraction was interrupted between setting the mtime and setting non-default flags.
  702. return True
  703. if dry_run or stdout:
  704. with self.extract_helper(item, "", hlm, dry_run=dry_run or stdout) as hardlink_set:
  705. if not hardlink_set:
  706. # it does not really set hardlinks due to dry_run, but we need to behave same
  707. # as non-dry_run concerning fetching preloaded chunks from the pipeline or
  708. # it would get stuck.
  709. if "chunks" in item:
  710. item_chunks_size = 0
  711. for data in self.pipeline.fetch_many(item.chunks, is_preloaded=True, ro_type=ROBJ_FILE_STREAM):
  712. if pi:
  713. pi.show(increase=len(data), info=[remove_surrogates(item.path)])
  714. if stdout:
  715. sys.stdout.buffer.write(data)
  716. item_chunks_size += len(data)
  717. if stdout:
  718. sys.stdout.buffer.flush()
  719. if "size" in item:
  720. item_size = item.size
  721. if item_size != item_chunks_size:
  722. raise BackupError(
  723. "Size inconsistency detected: size {}, chunks size {}".format(
  724. item_size, item_chunks_size
  725. )
  726. )
  727. return
  728. dest = self.cwd
  729. path = os.path.join(dest, item.path)
  730. # Attempt to remove existing files, ignore errors on failure
  731. try:
  732. st = os.stat(path, follow_symlinks=False)
  733. if continue_extraction and same_item(item, st):
  734. return # done! we already have fully extracted this file in a previous run.
  735. elif stat.S_ISDIR(st.st_mode):
  736. os.rmdir(path)
  737. else:
  738. os.unlink(path)
  739. except UnicodeEncodeError:
  740. raise self.IncompatibleFilesystemEncodingError(path, sys.getfilesystemencoding()) from None
  741. except OSError:
  742. pass
  743. def make_parent(path):
  744. parent_dir = os.path.dirname(path)
  745. if not os.path.exists(parent_dir):
  746. os.makedirs(parent_dir)
  747. mode = item.mode
  748. if stat.S_ISREG(mode):
  749. with backup_io("makedirs"):
  750. make_parent(path)
  751. with self.extract_helper(item, path, hlm) as hardlink_set:
  752. if hardlink_set:
  753. return
  754. with backup_io("open"):
  755. fd = open(path, "wb")
  756. with fd:
  757. for data in self.pipeline.fetch_many(item.chunks, is_preloaded=True, ro_type=ROBJ_FILE_STREAM):
  758. if pi:
  759. pi.show(increase=len(data), info=[remove_surrogates(item.path)])
  760. with backup_io("write"):
  761. if sparse and zeros.startswith(data):
  762. # all-zero chunk: create a hole in a sparse file
  763. fd.seek(len(data), 1)
  764. else:
  765. fd.write(data)
  766. with backup_io("truncate_and_attrs"):
  767. pos = item_chunks_size = fd.tell()
  768. fd.truncate(pos)
  769. fd.flush()
  770. self.restore_attrs(path, item, fd=fd.fileno())
  771. if "size" in item:
  772. item_size = item.size
  773. if item_size != item_chunks_size:
  774. raise BackupError(
  775. f"Size inconsistency detected: size {item_size}, chunks size {item_chunks_size}"
  776. )
  777. return
  778. with backup_io:
  779. # No repository access beyond this point.
  780. if stat.S_ISDIR(mode):
  781. make_parent(path)
  782. if not os.path.exists(path):
  783. os.mkdir(path)
  784. if restore_attrs:
  785. self.restore_attrs(path, item)
  786. elif stat.S_ISLNK(mode):
  787. make_parent(path)
  788. with self.extract_helper(item, path, hlm) as hardlink_set:
  789. if hardlink_set:
  790. # unusual, but possible: this is a hardlinked symlink.
  791. return
  792. target = item.target
  793. try:
  794. os.symlink(target, path)
  795. except UnicodeEncodeError:
  796. raise self.IncompatibleFilesystemEncodingError(target, sys.getfilesystemencoding()) from None
  797. self.restore_attrs(path, item, symlink=True)
  798. elif stat.S_ISFIFO(mode):
  799. make_parent(path)
  800. with self.extract_helper(item, path, hlm) as hardlink_set:
  801. if hardlink_set:
  802. return
  803. os.mkfifo(path)
  804. self.restore_attrs(path, item)
  805. elif stat.S_ISCHR(mode) or stat.S_ISBLK(mode):
  806. make_parent(path)
  807. with self.extract_helper(item, path, hlm) as hardlink_set:
  808. if hardlink_set:
  809. return
  810. os.mknod(path, item.mode, item.rdev)
  811. self.restore_attrs(path, item)
  812. else:
  813. raise Exception("Unknown archive item type %r" % item.mode)
  814. def restore_attrs(self, path, item, symlink=False, fd=None):
  815. """
  816. Restore filesystem attributes on *path* (*fd*) from *item*.
  817. Does not access the repository.
  818. """
  819. backup_io.op = "attrs"
  820. # This code is a bit of a mess due to OS specific differences.
  821. if not is_win32:
  822. # by using uid_default = -1 and gid_default = -1, they will not be restored if
  823. # the archived item has no information about them.
  824. uid, gid = get_item_uid_gid(item, numeric=self.numeric_ids, uid_default=-1, gid_default=-1)
  825. # if uid and/or gid is -1, chown will keep it as is and not change it.
  826. try:
  827. if fd:
  828. os.fchown(fd, uid, gid)
  829. else:
  830. os.chown(path, uid, gid, follow_symlinks=False)
  831. except OSError:
  832. pass
  833. if fd:
  834. os.fchmod(fd, item.mode)
  835. else:
  836. # To check whether a particular function in the os module accepts False for its
  837. # follow_symlinks parameter, the in operator on supports_follow_symlinks should be
  838. # used. However, os.chmod is special as some platforms without a working lchmod() do
  839. # have fchmodat(), which has a flag that makes it behave like lchmod(). fchmodat()
  840. # is ignored when deciding whether or not os.chmod should be set in
  841. # os.supports_follow_symlinks. Work around this by using try/except.
  842. try:
  843. os.chmod(path, item.mode, follow_symlinks=False)
  844. except NotImplementedError:
  845. if not symlink:
  846. os.chmod(path, item.mode)
  847. if not self.noacls:
  848. try:
  849. acl_set(path, item, self.numeric_ids, fd=fd)
  850. except OSError as e:
  851. if e.errno not in (errno.ENOTSUP,):
  852. raise
  853. if not self.noxattrs and "xattrs" in item:
  854. # chown removes Linux capabilities, so set the extended attributes at the end, after chown,
  855. # since they include the Linux capabilities in the "security.capability" attribute.
  856. warning = xattr.set_all(fd or path, item.xattrs, follow_symlinks=False)
  857. if warning:
  858. set_ec(EXIT_WARNING)
  859. # set timestamps rather late
  860. mtime = item.mtime
  861. atime = item.atime if "atime" in item else mtime
  862. if "birthtime" in item:
  863. birthtime = item.birthtime
  864. try:
  865. # This should work on FreeBSD, NetBSD, and Darwin and be harmless on other platforms.
  866. # See utimes(2) on either of the BSDs for details.
  867. if fd:
  868. os.utime(fd, None, ns=(atime, birthtime))
  869. else:
  870. os.utime(path, None, ns=(atime, birthtime), follow_symlinks=False)
  871. except OSError:
  872. # some systems don't support calling utime on a symlink
  873. pass
  874. try:
  875. if fd:
  876. os.utime(fd, None, ns=(atime, mtime))
  877. else:
  878. os.utime(path, None, ns=(atime, mtime), follow_symlinks=False)
  879. except OSError:
  880. # some systems don't support calling utime on a symlink
  881. pass
  882. # bsdflags include the immutable flag and need to be set last:
  883. if not self.noflags and "bsdflags" in item:
  884. try:
  885. set_flags(path, item.bsdflags, fd=fd)
  886. except OSError:
  887. pass
  888. else: # win32
  889. # set timestamps rather late
  890. mtime = item.mtime
  891. atime = item.atime if "atime" in item else mtime
  892. try:
  893. # note: no fd support on win32
  894. os.utime(path, None, ns=(atime, mtime))
  895. except OSError:
  896. # some systems don't support calling utime on a symlink
  897. pass
  898. def set_meta(self, key, value):
  899. metadata = self._load_meta(self.id)
  900. setattr(metadata, key, value)
  901. if "items" in metadata:
  902. del metadata.items
  903. data = self.key.pack_metadata(metadata.as_dict())
  904. new_id = self.key.id_hash(data)
  905. self.cache.add_chunk(new_id, {}, data, stats=self.stats, ro_type=ROBJ_ARCHIVE_META)
  906. self.manifest.archives.create(self.name, new_id, metadata.time, overwrite=True)
  907. self.id = new_id
  908. def rename(self, name):
  909. old_id = self.id
  910. self.name = name
  911. self.set_meta("name", name)
  912. self.manifest.archives.delete_by_id(old_id)
  913. def delete(self):
  914. # quick and dirty: we just nuke the archive from the archives list - that will
  915. # potentially orphan all chunks previously referenced by the archive, except the ones also
  916. # referenced by other archives. In the end, "borg compact" will clean up and free space.
  917. self.manifest.archives.delete_by_id(self.id)
  918. @staticmethod
  919. def compare_archives_iter(
  920. archive1: "Archive", archive2: "Archive", matcher=None, can_compare_chunk_ids=False
  921. ) -> Iterator[ItemDiff]:
  922. """
  923. Yields an ItemDiff instance describing changes/indicating equality.
  924. :param matcher: PatternMatcher class to restrict results to only matching paths.
  925. :param can_compare_chunk_ids: Whether --chunker-params are the same for both archives.
  926. """
  927. def compare_items(path: str, item1: Item, item2: Item):
  928. return ItemDiff(
  929. path,
  930. item1,
  931. item2,
  932. archive1.pipeline.fetch_many(item1.get("chunks", []), ro_type=ROBJ_FILE_STREAM),
  933. archive2.pipeline.fetch_many(item2.get("chunks", []), ro_type=ROBJ_FILE_STREAM),
  934. can_compare_chunk_ids=can_compare_chunk_ids,
  935. )
  936. orphans_archive1: OrderedDict[str, Item] = OrderedDict()
  937. orphans_archive2: OrderedDict[str, Item] = OrderedDict()
  938. assert matcher is not None, "matcher must be set"
  939. for item1, item2 in zip_longest(
  940. archive1.iter_items(lambda item: matcher.match(item.path)),
  941. archive2.iter_items(lambda item: matcher.match(item.path)),
  942. ):
  943. if item1 and item2 and item1.path == item2.path:
  944. yield compare_items(item1.path, item1, item2)
  945. continue
  946. if item1:
  947. matching_orphan = orphans_archive2.pop(item1.path, None)
  948. if matching_orphan:
  949. yield compare_items(item1.path, item1, matching_orphan)
  950. else:
  951. orphans_archive1[item1.path] = item1
  952. if item2:
  953. matching_orphan = orphans_archive1.pop(item2.path, None)
  954. if matching_orphan:
  955. yield compare_items(matching_orphan.path, matching_orphan, item2)
  956. else:
  957. orphans_archive2[item2.path] = item2
  958. # At this point orphans_* contain items that had no matching partner in the other archive
  959. for added in orphans_archive2.values():
  960. path = added.path
  961. deleted_item = Item.create_deleted(path)
  962. yield compare_items(path, deleted_item, added)
  963. for deleted in orphans_archive1.values():
  964. path = deleted.path
  965. deleted_item = Item.create_deleted(path)
  966. yield compare_items(path, deleted, deleted_item)
  967. class MetadataCollector:
  968. def __init__(self, *, noatime, noctime, nobirthtime, numeric_ids, noflags, noacls, noxattrs):
  969. self.noatime = noatime
  970. self.noctime = noctime
  971. self.numeric_ids = numeric_ids
  972. self.noflags = noflags
  973. self.noacls = noacls
  974. self.noxattrs = noxattrs
  975. self.nobirthtime = nobirthtime
  976. def stat_simple_attrs(self, st, path, fd=None):
  977. attrs = {}
  978. attrs["mode"] = st.st_mode
  979. # borg can work with archives only having mtime (very old borg archives do not have
  980. # atime/ctime). it can be useful to omit atime/ctime, if they change without the
  981. # file content changing - e.g. to get better metadata deduplication.
  982. attrs["mtime"] = safe_ns(st.st_mtime_ns)
  983. if not self.noatime:
  984. attrs["atime"] = safe_ns(st.st_atime_ns)
  985. if not self.noctime:
  986. attrs["ctime"] = safe_ns(st.st_ctime_ns)
  987. if not self.nobirthtime:
  988. birthtime_ns = get_birthtime_ns(st, path, fd=fd)
  989. if birthtime_ns is not None:
  990. attrs["birthtime"] = safe_ns(birthtime_ns)
  991. attrs["uid"] = st.st_uid
  992. attrs["gid"] = st.st_gid
  993. if not self.numeric_ids:
  994. user = uid2user(st.st_uid)
  995. if user is not None:
  996. attrs["user"] = user
  997. group = gid2group(st.st_gid)
  998. if group is not None:
  999. attrs["group"] = group
  1000. if st.st_ino > 0:
  1001. attrs["inode"] = st.st_ino
  1002. return attrs
  1003. def stat_ext_attrs(self, st, path, fd=None):
  1004. attrs = {}
  1005. if not self.noflags:
  1006. with backup_io("extended stat (flags)"):
  1007. flags = get_flags(path, st, fd=fd)
  1008. attrs["bsdflags"] = flags
  1009. if not self.noxattrs:
  1010. with backup_io("extended stat (xattrs)"):
  1011. xattrs = xattr.get_all(fd or path, follow_symlinks=False)
  1012. attrs["xattrs"] = StableDict(xattrs)
  1013. if not self.noacls:
  1014. with backup_io("extended stat (ACLs)"):
  1015. try:
  1016. acl_get(path, attrs, st, self.numeric_ids, fd=fd)
  1017. except OSError as e:
  1018. if e.errno not in (errno.ENOTSUP,):
  1019. raise
  1020. return attrs
  1021. def stat_attrs(self, st, path, fd=None):
  1022. attrs = self.stat_simple_attrs(st, path, fd=fd)
  1023. attrs.update(self.stat_ext_attrs(st, path, fd=fd))
  1024. return attrs
  1025. # remember a few recently used all-zero chunk hashes in this mapping.
  1026. # (hash_func, chunk_length) -> chunk_hash
  1027. # we play safe and have the hash_func in the mapping key, in case we
  1028. # have different hash_funcs within the same borg run.
  1029. zero_chunk_ids = LRUCache(10) # type: ignore[var-annotated]
  1030. def cached_hash(chunk, id_hash):
  1031. allocation = chunk.meta["allocation"]
  1032. if allocation == CH_DATA:
  1033. data = chunk.data
  1034. chunk_id = id_hash(data)
  1035. elif allocation in (CH_HOLE, CH_ALLOC):
  1036. size = chunk.meta["size"]
  1037. assert size <= len(zeros)
  1038. data = memoryview(zeros)[:size]
  1039. try:
  1040. chunk_id = zero_chunk_ids[(id_hash, size)]
  1041. except KeyError:
  1042. chunk_id = id_hash(data)
  1043. zero_chunk_ids[(id_hash, size)] = chunk_id
  1044. else:
  1045. raise ValueError("unexpected allocation type")
  1046. return chunk_id, data
  1047. class ChunksProcessor:
  1048. # Processes an iterator of chunks for an Item
  1049. def __init__(self, *, key, cache, add_item, rechunkify):
  1050. self.key = key
  1051. self.cache = cache
  1052. self.add_item = add_item
  1053. self.rechunkify = rechunkify
  1054. def process_file_chunks(self, item, cache, stats, show_progress, chunk_iter, chunk_processor=None):
  1055. if not chunk_processor:
  1056. def chunk_processor(chunk):
  1057. started_hashing = time.monotonic()
  1058. chunk_id, data = cached_hash(chunk, self.key.id_hash)
  1059. stats.hashing_time += time.monotonic() - started_hashing
  1060. chunk_entry = cache.add_chunk(chunk_id, {}, data, stats=stats, wait=False, ro_type=ROBJ_FILE_STREAM)
  1061. self.cache.repository.async_response(wait=False)
  1062. return chunk_entry
  1063. item.chunks = []
  1064. for chunk in chunk_iter:
  1065. chunk_entry = chunk_processor(chunk)
  1066. item.chunks.append(chunk_entry)
  1067. if show_progress:
  1068. stats.show_progress(item=item, dt=0.2)
  1069. def maybe_exclude_by_attr(item):
  1070. if xattrs := item.get("xattrs"):
  1071. apple_excluded = xattrs.get(b"com.apple.metadata:com_apple_backup_excludeItem")
  1072. linux_excluded = xattrs.get(b"user.xdg.robots.backup")
  1073. if apple_excluded is not None or linux_excluded == b"true":
  1074. raise BackupItemExcluded
  1075. if flags := item.get("bsdflags"):
  1076. if flags & stat.UF_NODUMP:
  1077. raise BackupItemExcluded
  1078. class FilesystemObjectProcessors:
  1079. # When ported to threading, then this doesn't need chunker, cache, key any more.
  1080. # process_file becomes a callback passed to __init__.
  1081. def __init__(
  1082. self,
  1083. *,
  1084. metadata_collector,
  1085. cache,
  1086. key,
  1087. add_item,
  1088. process_file_chunks,
  1089. chunker_params,
  1090. show_progress,
  1091. sparse,
  1092. log_json,
  1093. iec,
  1094. file_status_printer=None,
  1095. files_changed="ctime",
  1096. ):
  1097. self.metadata_collector = metadata_collector
  1098. self.cache = cache
  1099. self.key = key
  1100. self.add_item = add_item
  1101. self.process_file_chunks = process_file_chunks
  1102. self.show_progress = show_progress
  1103. self.print_file_status = file_status_printer or (lambda *args: None)
  1104. self.files_changed = files_changed
  1105. self.hlm = HardLinkManager(id_type=tuple, info_type=(list, type(None))) # (dev, ino) -> chunks or None
  1106. self.stats = Statistics(output_json=log_json, iec=iec) # threading: done by cache (including progress)
  1107. self.cwd = os.getcwd()
  1108. self.chunker = get_chunker(*chunker_params, key=key, sparse=sparse)
  1109. @contextmanager
  1110. def create_helper(self, path, st, status=None, hardlinkable=True, strip_prefix=None):
  1111. if strip_prefix is not None:
  1112. assert not path.endswith(os.sep)
  1113. if strip_prefix.startswith(path + os.sep):
  1114. # still on a directory level that shall be stripped - do not create an item for this!
  1115. yield None, "x", False, None
  1116. return
  1117. # adjust path, remove stripped directory levels
  1118. path = path.removeprefix(strip_prefix)
  1119. sanitized_path = remove_dotdot_prefixes(path)
  1120. item = Item(path=sanitized_path)
  1121. hardlinked = hardlinkable and st.st_nlink > 1
  1122. hl_chunks = None
  1123. update_map = False
  1124. if hardlinked:
  1125. status = "h" # hardlink
  1126. nothing = object()
  1127. chunks = self.hlm.retrieve(id=(st.st_ino, st.st_dev), default=nothing)
  1128. if chunks is nothing:
  1129. update_map = True
  1130. elif chunks is not None:
  1131. hl_chunks = chunks
  1132. item.hlid = self.hlm.hardlink_id_from_inode(ino=st.st_ino, dev=st.st_dev)
  1133. yield item, status, hardlinked, hl_chunks
  1134. maybe_exclude_by_attr(item)
  1135. self.add_item(item, stats=self.stats)
  1136. if update_map:
  1137. # remember the hlid of this fs object and if the item has chunks,
  1138. # also remember them, so we do not have to re-chunk a hardlink.
  1139. chunks = item.chunks if "chunks" in item else None
  1140. self.hlm.remember(id=(st.st_ino, st.st_dev), info=chunks)
  1141. def process_dir_with_fd(self, *, path, fd, st, strip_prefix):
  1142. with self.create_helper(path, st, "d", hardlinkable=False, strip_prefix=strip_prefix) as (
  1143. item,
  1144. status,
  1145. hardlinked,
  1146. hl_chunks,
  1147. ):
  1148. if item is not None:
  1149. item.update(self.metadata_collector.stat_attrs(st, path, fd=fd))
  1150. return status
  1151. def process_dir(self, *, path, parent_fd, name, st, strip_prefix):
  1152. with self.create_helper(path, st, "d", hardlinkable=False, strip_prefix=strip_prefix) as (
  1153. item,
  1154. status,
  1155. hardlinked,
  1156. hl_chunks,
  1157. ):
  1158. if item is None:
  1159. return status
  1160. with OsOpen(path=path, parent_fd=parent_fd, name=name, flags=flags_dir, noatime=True, op="dir_open") as fd:
  1161. # fd is None for directories on windows, in that case a race condition check is not possible.
  1162. if fd is not None:
  1163. with backup_io("fstat"):
  1164. st = stat_update_check(st, os.fstat(fd))
  1165. item.update(self.metadata_collector.stat_attrs(st, path, fd=fd))
  1166. return status
  1167. def process_fifo(self, *, path, parent_fd, name, st, strip_prefix):
  1168. with self.create_helper(path, st, "f", strip_prefix=strip_prefix) as (
  1169. item,
  1170. status,
  1171. hardlinked,
  1172. hl_chunks,
  1173. ): # fifo
  1174. if item is None:
  1175. return status
  1176. with OsOpen(path=path, parent_fd=parent_fd, name=name, flags=flags_normal, noatime=True) as fd:
  1177. with backup_io("fstat"):
  1178. st = stat_update_check(st, os.fstat(fd))
  1179. item.update(self.metadata_collector.stat_attrs(st, path, fd=fd))
  1180. return status
  1181. def process_dev(self, *, path, parent_fd, name, st, dev_type, strip_prefix):
  1182. with self.create_helper(path, st, dev_type, strip_prefix=strip_prefix) as (
  1183. item,
  1184. status,
  1185. hardlinked,
  1186. hl_chunks,
  1187. ): # char/block device
  1188. # looks like we can not work fd-based here without causing issues when trying to open/close the device
  1189. if item is None:
  1190. return status
  1191. with backup_io("stat"):
  1192. st = stat_update_check(st, os_stat(path=path, parent_fd=parent_fd, name=name, follow_symlinks=False))
  1193. item.rdev = st.st_rdev
  1194. item.update(self.metadata_collector.stat_attrs(st, path))
  1195. return status
  1196. def process_symlink(self, *, path, parent_fd, name, st, strip_prefix):
  1197. with self.create_helper(path, st, "s", hardlinkable=True, strip_prefix=strip_prefix) as (
  1198. item,
  1199. status,
  1200. hardlinked,
  1201. hl_chunks,
  1202. ):
  1203. if item is None:
  1204. return status
  1205. fname = name if name is not None and parent_fd is not None else path
  1206. with backup_io("readlink"):
  1207. target = os.readlink(fname, dir_fd=parent_fd)
  1208. item.target = target
  1209. item.update(self.metadata_collector.stat_attrs(st, path)) # can't use FD here?
  1210. return status
  1211. def process_pipe(self, *, path, cache, fd, mode, user=None, group=None):
  1212. status = "i" # stdin (or other pipe)
  1213. self.print_file_status(status, path)
  1214. status = None # we already printed the status
  1215. if user is not None:
  1216. uid = user2uid(user)
  1217. if uid is None:
  1218. raise Error("no such user: %s" % user)
  1219. else:
  1220. uid = None
  1221. if group is not None:
  1222. gid = group2gid(group)
  1223. if gid is None:
  1224. raise Error("no such group: %s" % group)
  1225. else:
  1226. gid = None
  1227. t = int(time.time()) * 1000000000
  1228. item = Item(path=path, mode=mode & 0o107777 | 0o100000, mtime=t, atime=t, ctime=t) # forcing regular file mode
  1229. if user is not None:
  1230. item.user = user
  1231. if group is not None:
  1232. item.group = group
  1233. if uid is not None:
  1234. item.uid = uid
  1235. if gid is not None:
  1236. item.gid = gid
  1237. self.process_file_chunks(item, cache, self.stats, self.show_progress, backup_io_iter(self.chunker.chunkify(fd)))
  1238. item.get_size(memorize=True)
  1239. self.stats.nfiles += 1
  1240. self.add_item(item, stats=self.stats)
  1241. return status
  1242. def process_file(self, *, path, parent_fd, name, st, cache, flags=flags_normal, last_try=False, strip_prefix):
  1243. with self.create_helper(path, st, None, strip_prefix=strip_prefix) as (
  1244. item,
  1245. status,
  1246. hardlinked,
  1247. hl_chunks,
  1248. ): # no status yet
  1249. if item is None:
  1250. return status
  1251. with OsOpen(path=path, parent_fd=parent_fd, name=name, flags=flags, noatime=True) as fd:
  1252. with backup_io("fstat"):
  1253. st = stat_update_check(st, os.fstat(fd))
  1254. item.update(self.metadata_collector.stat_simple_attrs(st, path, fd=fd))
  1255. item.update(self.metadata_collector.stat_ext_attrs(st, path, fd=fd))
  1256. maybe_exclude_by_attr(item) # check early, before processing all the file content
  1257. is_special_file = is_special(st.st_mode)
  1258. if is_special_file:
  1259. # we process a special file like a regular file. reflect that in mode,
  1260. # so it can be extracted / accessed in FUSE mount like a regular file.
  1261. # this needs to be done early, so that part files also get the patched mode.
  1262. item.mode = stat.S_IFREG | stat.S_IMODE(item.mode)
  1263. # we begin processing chunks now.
  1264. if hl_chunks is not None: # create_helper gave us chunks from a previous hardlink
  1265. item.chunks = []
  1266. for chunk_id, chunk_size in hl_chunks:
  1267. # process one-by-one, so we will know in item.chunks how far we got
  1268. chunk_entry = cache.reuse_chunk(chunk_id, chunk_size, self.stats)
  1269. item.chunks.append(chunk_entry)
  1270. else: # normal case, no "2nd+" hardlink
  1271. if not is_special_file:
  1272. hashed_path = safe_encode(item.path) # path as in archive item!
  1273. started_hashing = time.monotonic()
  1274. path_hash = self.key.id_hash(hashed_path)
  1275. self.stats.hashing_time += time.monotonic() - started_hashing
  1276. known, chunks = cache.file_known_and_unchanged(hashed_path, path_hash, st)
  1277. else:
  1278. # in --read-special mode, we may be called for special files.
  1279. # there should be no information in the cache about special files processed in
  1280. # read-special mode, but we better play safe as this was wrong in the past:
  1281. hashed_path = path_hash = None
  1282. known, chunks = False, None
  1283. if chunks is not None:
  1284. # Make sure all ids are available
  1285. for chunk in chunks:
  1286. if not cache.seen_chunk(chunk.id):
  1287. # cache said it is unmodified, but we lost a chunk: process file like modified
  1288. status = "M"
  1289. break
  1290. else:
  1291. item.chunks = []
  1292. for chunk in chunks:
  1293. # process one-by-one, so we will know in item.chunks how far we got
  1294. cache.reuse_chunk(chunk.id, chunk.size, self.stats)
  1295. item.chunks.append(chunk)
  1296. status = "U" # regular file, unchanged
  1297. else:
  1298. status = "M" if known else "A" # regular file, modified or added
  1299. self.print_file_status(status, path)
  1300. # Only chunkify the file if needed
  1301. changed_while_backup = False
  1302. if "chunks" not in item:
  1303. start_reading = time.time_ns()
  1304. with backup_io("read"):
  1305. self.process_file_chunks(
  1306. item,
  1307. cache,
  1308. self.stats,
  1309. self.show_progress,
  1310. backup_io_iter(self.chunker.chunkify(None, fd)),
  1311. )
  1312. self.stats.chunking_time = self.chunker.chunking_time
  1313. end_reading = time.time_ns()
  1314. if not is_win32: # TODO for win32
  1315. with backup_io("fstat2"):
  1316. st2 = os.fstat(fd)
  1317. if self.files_changed == "disabled" or is_special_file:
  1318. # special files:
  1319. # - fifos change naturally, because they are fed from the other side. no problem.
  1320. # - blk/chr devices don't change ctime anyway.
  1321. pass
  1322. elif self.files_changed == "ctime":
  1323. if st.st_ctime_ns != st2.st_ctime_ns:
  1324. # ctime was changed, this is either a metadata or a data change.
  1325. changed_while_backup = True
  1326. elif (
  1327. start_reading - TIME_DIFFERS1_NS < st2.st_ctime_ns < end_reading + TIME_DIFFERS1_NS
  1328. ):
  1329. # this is to treat a very special race condition, see #3536.
  1330. # - file was changed right before st.ctime was determined.
  1331. # - then, shortly afterwards, but already while we read the file, the
  1332. # file was changed again, but st2.ctime is the same due to ctime granularity.
  1333. # when comparing file ctime to local clock, widen interval by TIME_DIFFERS1_NS.
  1334. changed_while_backup = True
  1335. elif self.files_changed == "mtime":
  1336. if st.st_mtime_ns != st2.st_mtime_ns:
  1337. # mtime was changed, this is either a data change.
  1338. changed_while_backup = True
  1339. elif (
  1340. start_reading - TIME_DIFFERS1_NS < st2.st_mtime_ns < end_reading + TIME_DIFFERS1_NS
  1341. ):
  1342. # this is to treat a very special race condition, see #3536.
  1343. # - file was changed right before st.mtime was determined.
  1344. # - then, shortly afterwards, but already while we read the file, the
  1345. # file was changed again, but st2.mtime is the same due to mtime granularity.
  1346. # when comparing file mtime to local clock, widen interval by TIME_DIFFERS1_NS.
  1347. changed_while_backup = True
  1348. if changed_while_backup:
  1349. # regular file changed while we backed it up, might be inconsistent/corrupt!
  1350. if last_try:
  1351. status = "C" # crap! retries did not help.
  1352. else:
  1353. raise BackupError("file changed while we read it!")
  1354. if not is_special_file and not changed_while_backup:
  1355. # we must not memorize special files, because the contents of e.g. a
  1356. # block or char device will change without its mtime/size/inode changing.
  1357. # also, we must not memorize a potentially inconsistent/corrupt file that
  1358. # changed while we backed it up.
  1359. cache.memorize_file(hashed_path, path_hash, st, item.chunks)
  1360. self.stats.files_stats[status] += 1 # must be done late
  1361. if not changed_while_backup:
  1362. status = None # we already called print_file_status
  1363. self.stats.nfiles += 1
  1364. item.get_size(memorize=True)
  1365. return status
  1366. class TarfileObjectProcessors:
  1367. def __init__(
  1368. self,
  1369. *,
  1370. cache,
  1371. key,
  1372. add_item,
  1373. process_file_chunks,
  1374. chunker_params,
  1375. show_progress,
  1376. log_json,
  1377. iec,
  1378. file_status_printer=None,
  1379. ):
  1380. self.cache = cache
  1381. self.key = key
  1382. self.add_item = add_item
  1383. self.process_file_chunks = process_file_chunks
  1384. self.show_progress = show_progress
  1385. self.print_file_status = file_status_printer or (lambda *args: None)
  1386. self.stats = Statistics(output_json=log_json, iec=iec) # threading: done by cache (including progress)
  1387. self.chunker = get_chunker(*chunker_params, key=key, sparse=False)
  1388. self.hlm = HardLinkManager(id_type=str, info_type=list) # normalized/safe path -> chunks
  1389. @contextmanager
  1390. def create_helper(self, tarinfo, status=None, type=None):
  1391. ph = tarinfo.pax_headers
  1392. if ph and "BORG.item.version" in ph:
  1393. assert ph["BORG.item.version"] == "1"
  1394. meta_bin = base64.b64decode(ph["BORG.item.meta"])
  1395. meta_dict = msgpack.unpackb(meta_bin, object_hook=StableDict)
  1396. item = Item(internal_dict=meta_dict)
  1397. else:
  1398. def s_to_ns(s):
  1399. return safe_ns(int(float(s) * 1e9))
  1400. # if the tar has names starting with "./", normalize them like borg create also does.
  1401. # ./dir/file must become dir/file in the borg archive.
  1402. normalized_path = os.path.normpath(tarinfo.name)
  1403. item = Item(
  1404. path=make_path_safe(normalized_path),
  1405. mode=tarinfo.mode | type,
  1406. uid=tarinfo.uid,
  1407. gid=tarinfo.gid,
  1408. mtime=s_to_ns(tarinfo.mtime),
  1409. )
  1410. if tarinfo.uname:
  1411. item.user = tarinfo.uname
  1412. if tarinfo.gname:
  1413. item.group = tarinfo.gname
  1414. if ph:
  1415. # note: for mtime this is a bit redundant as it is already done by tarfile module,
  1416. # but we just do it in our way to be consistent for sure.
  1417. for name in "atime", "ctime", "mtime":
  1418. if name in ph:
  1419. ns = s_to_ns(ph[name])
  1420. setattr(item, name, ns)
  1421. xattrs = StableDict()
  1422. for key, value in ph.items():
  1423. if key.startswith(SCHILY_XATTR):
  1424. key = key.removeprefix(SCHILY_XATTR)
  1425. # the tarfile code gives us str keys and str values,
  1426. # but we need bytes keys and bytes values.
  1427. bkey = key.encode("utf-8", errors="surrogateescape")
  1428. bvalue = value.encode("utf-8", errors="surrogateescape")
  1429. xattrs[bkey] = bvalue
  1430. if xattrs:
  1431. item.xattrs = xattrs
  1432. yield item, status
  1433. # if we get here, "with"-block worked ok without error/exception, the item was processed ok...
  1434. self.add_item(item, stats=self.stats)
  1435. def process_dir(self, *, tarinfo, status, type):
  1436. with self.create_helper(tarinfo, status, type) as (item, status):
  1437. return status
  1438. def process_fifo(self, *, tarinfo, status, type):
  1439. with self.create_helper(tarinfo, status, type) as (item, status):
  1440. return status
  1441. def process_dev(self, *, tarinfo, status, type):
  1442. with self.create_helper(tarinfo, status, type) as (item, status):
  1443. item.rdev = os.makedev(tarinfo.devmajor, tarinfo.devminor)
  1444. return status
  1445. def process_symlink(self, *, tarinfo, status, type):
  1446. with self.create_helper(tarinfo, status, type) as (item, status):
  1447. item.target = tarinfo.linkname
  1448. return status
  1449. def process_hardlink(self, *, tarinfo, status, type):
  1450. with self.create_helper(tarinfo, status, type) as (item, status):
  1451. # create a not hardlinked borg item, reusing the chunks, see HardLinkManager.__doc__
  1452. normalized_path = os.path.normpath(tarinfo.linkname)
  1453. safe_path = make_path_safe(normalized_path)
  1454. chunks = self.hlm.retrieve(safe_path)
  1455. if chunks is not None:
  1456. item.chunks = chunks
  1457. item.get_size(memorize=True, from_chunks=True)
  1458. self.stats.nfiles += 1
  1459. return status
  1460. def process_file(self, *, tarinfo, status, type, tar):
  1461. with self.create_helper(tarinfo, status, type) as (item, status):
  1462. self.print_file_status(status, item.path)
  1463. status = None # we already printed the status
  1464. fd = tar.extractfile(tarinfo)
  1465. self.process_file_chunks(
  1466. item, self.cache, self.stats, self.show_progress, backup_io_iter(self.chunker.chunkify(fd))
  1467. )
  1468. item.get_size(memorize=True, from_chunks=True)
  1469. self.stats.nfiles += 1
  1470. # we need to remember ALL files, see HardLinkManager.__doc__
  1471. self.hlm.remember(id=item.path, info=item.chunks)
  1472. return status
  1473. def valid_msgpacked_dict(d, keys_serialized):
  1474. """check if the data <d> looks like a msgpacked dict"""
  1475. d_len = len(d)
  1476. if d_len == 0:
  1477. return False
  1478. if d[0] & 0xF0 == 0x80: # object is a fixmap (up to 15 elements)
  1479. offs = 1
  1480. elif d[0] == 0xDE: # object is a map16 (up to 2^16-1 elements)
  1481. offs = 3
  1482. else:
  1483. # object is not a map (dict)
  1484. # note: we must not have dicts with > 2^16-1 elements
  1485. return False
  1486. if d_len <= offs:
  1487. return False
  1488. # is the first dict key a bytestring?
  1489. if d[offs] & 0xE0 == 0xA0: # key is a small bytestring (up to 31 chars)
  1490. pass
  1491. elif d[offs] in (0xD9, 0xDA, 0xDB): # key is a str8, str16 or str32
  1492. pass
  1493. else:
  1494. # key is not a bytestring
  1495. return False
  1496. # is the bytestring any of the expected key names?
  1497. key_serialized = d[offs:]
  1498. return any(key_serialized.startswith(pattern) for pattern in keys_serialized)
  1499. class RobustUnpacker:
  1500. """A restartable/robust version of the streaming msgpack unpacker"""
  1501. def __init__(self, validator, item_keys):
  1502. super().__init__()
  1503. self.item_keys = [msgpack.packb(name) for name in item_keys]
  1504. self.validator = validator
  1505. self._buffered_data = []
  1506. self._resync = False
  1507. self._unpacker = msgpack.Unpacker(object_hook=StableDict)
  1508. def resync(self):
  1509. self._buffered_data = []
  1510. self._resync = True
  1511. def feed(self, data):
  1512. if self._resync:
  1513. self._buffered_data.append(data)
  1514. else:
  1515. self._unpacker.feed(data)
  1516. def __iter__(self):
  1517. return self
  1518. def __next__(self):
  1519. if self._resync:
  1520. data = b"".join(self._buffered_data)
  1521. while self._resync:
  1522. if not data:
  1523. raise StopIteration
  1524. # Abort early if the data does not look like a serialized item dict
  1525. if not valid_msgpacked_dict(data, self.item_keys):
  1526. data = data[1:]
  1527. continue
  1528. self._unpacker = msgpack.Unpacker(object_hook=StableDict)
  1529. self._unpacker.feed(data)
  1530. try:
  1531. item = next(self._unpacker)
  1532. except (msgpack.UnpackException, StopIteration):
  1533. # as long as we are resyncing, we also ignore StopIteration
  1534. pass
  1535. else:
  1536. if self.validator(item):
  1537. self._resync = False
  1538. return item
  1539. data = data[1:]
  1540. else:
  1541. return next(self._unpacker)
  1542. class ArchiveChecker:
  1543. def __init__(self):
  1544. self.error_found = False
  1545. self.key = None
  1546. def check(
  1547. self,
  1548. repository,
  1549. *,
  1550. verify_data=False,
  1551. repair=False,
  1552. find_lost_archives=False,
  1553. match=None,
  1554. sort_by="",
  1555. first=0,
  1556. last=0,
  1557. older=None,
  1558. newer=None,
  1559. oldest=None,
  1560. newest=None,
  1561. ):
  1562. """Perform a set of checks on 'repository'
  1563. :param repair: enable repair mode, write updated or corrected data into repository
  1564. :param find_lost_archives: create archive directory entries that are missing
  1565. :param first/last/sort_by: only check this number of first/last archives ordered by sort_by
  1566. :param match: only check archives matching this pattern
  1567. :param older/newer: only check archives older/newer than timedelta from now
  1568. :param oldest/newest: only check archives older/newer than timedelta from oldest/newest archive timestamp
  1569. :param verify_data: integrity verification of data referenced by archives
  1570. """
  1571. if not isinstance(repository, (Repository, RemoteRepository)):
  1572. logger.error("Checking legacy repositories is not supported.")
  1573. return False
  1574. logger.info("Starting archive consistency check...")
  1575. self.check_all = not any((first, last, match, older, newer, oldest, newest))
  1576. self.repair = repair
  1577. self.repository = repository
  1578. # Repository.check already did a full repository-level check and has built and cached a fresh chunkindex -
  1579. # we can use that here, so we don't disable the caches (also no need to cache immediately, again):
  1580. self.chunks = build_chunkindex_from_repo(self.repository, disable_caches=False, cache_immediately=False)
  1581. if self.key is None:
  1582. self.key = self.make_key(repository)
  1583. self.repo_objs = RepoObj(self.key)
  1584. if verify_data:
  1585. self.verify_data()
  1586. rebuild_manifest = False
  1587. try:
  1588. repository.get_manifest()
  1589. except NoManifestError:
  1590. logger.error("Repository manifest is missing.")
  1591. self.error_found = True
  1592. rebuild_manifest = True
  1593. else:
  1594. try:
  1595. self.manifest = Manifest.load(repository, (Manifest.Operation.CHECK,), key=self.key)
  1596. except IntegrityErrorBase as exc:
  1597. logger.error("Repository manifest is corrupted: %s", exc)
  1598. self.error_found = True
  1599. rebuild_manifest = True
  1600. if rebuild_manifest:
  1601. self.manifest = self.rebuild_manifest()
  1602. if find_lost_archives:
  1603. self.rebuild_archives_directory()
  1604. self.rebuild_archives(
  1605. match=match, first=first, last=last, sort_by=sort_by, older=older, oldest=oldest, newer=newer, newest=newest
  1606. )
  1607. self.finish()
  1608. if self.error_found:
  1609. logger.error("Archive consistency check complete, problems found.")
  1610. else:
  1611. logger.info("Archive consistency check complete, no problems found.")
  1612. return self.repair or not self.error_found
  1613. def make_key(self, repository, manifest_only=False):
  1614. attempt = 0
  1615. # try the manifest first!
  1616. try:
  1617. cdata = repository.get_manifest()
  1618. except NoManifestError:
  1619. pass
  1620. else:
  1621. try:
  1622. return key_factory(repository, cdata)
  1623. except UnsupportedPayloadError:
  1624. # we get here, if the cdata we got has a corrupted key type byte
  1625. pass # ignore it, just continue trying
  1626. if not manifest_only:
  1627. for chunkid, _ in self.chunks.iteritems():
  1628. attempt += 1
  1629. if attempt > 999:
  1630. # we did a lot of attempts, but could not create the key via key_factory, give up.
  1631. break
  1632. cdata = repository.get(chunkid)
  1633. try:
  1634. return key_factory(repository, cdata)
  1635. except UnsupportedPayloadError:
  1636. # we get here, if the cdata we got has a corrupted key type byte
  1637. pass # ignore it, just try the next chunk
  1638. if attempt == 0:
  1639. if manifest_only:
  1640. msg = "make_key: failed to create the key (tried only the manifest)"
  1641. else:
  1642. msg = "make_key: repository has no chunks at all!"
  1643. else:
  1644. msg = "make_key: failed to create the key (tried %d chunks)" % attempt
  1645. raise IntegrityError(msg)
  1646. def verify_data(self):
  1647. logger.info("Starting cryptographic data integrity verification...")
  1648. chunks_count = len(self.chunks)
  1649. errors = 0
  1650. defect_chunks = []
  1651. pi = ProgressIndicatorPercent(
  1652. total=chunks_count, msg="Verifying data %6.2f%%", step=0.01, msgid="check.verify_data"
  1653. )
  1654. for chunk_id, _ in self.chunks.iteritems():
  1655. pi.show()
  1656. try:
  1657. encrypted_data = self.repository.get(chunk_id)
  1658. except (Repository.ObjectNotFound, IntegrityErrorBase) as err:
  1659. self.error_found = True
  1660. errors += 1
  1661. logger.error("chunk %s: %s", bin_to_hex(chunk_id), err)
  1662. if isinstance(err, IntegrityErrorBase):
  1663. defect_chunks.append(chunk_id)
  1664. else:
  1665. try:
  1666. # we must decompress, so it'll call assert_id() in there:
  1667. self.repo_objs.parse(chunk_id, encrypted_data, decompress=True, ro_type=ROBJ_DONTCARE)
  1668. except IntegrityErrorBase as integrity_error:
  1669. self.error_found = True
  1670. errors += 1
  1671. logger.error("chunk %s, integrity error: %s", bin_to_hex(chunk_id), integrity_error)
  1672. defect_chunks.append(chunk_id)
  1673. pi.finish()
  1674. if defect_chunks:
  1675. if self.repair:
  1676. # if we kill the defect chunk here, subsequent actions within this "borg check"
  1677. # run will find missing chunks.
  1678. logger.warning(
  1679. "Found defect chunks and will delete them now. "
  1680. "Reading files referencing these chunks will result in an I/O error."
  1681. )
  1682. for defect_chunk in defect_chunks:
  1683. # remote repo (ssh): retry might help for strange network / NIC / RAM errors
  1684. # as the chunk will be retransmitted from remote server.
  1685. # local repo (fs): as chunks.iteritems loop usually pumps a lot of data through,
  1686. # a defect chunk is likely not in the fs cache any more and really gets re-read
  1687. # from the underlying media.
  1688. try:
  1689. encrypted_data = self.repository.get(defect_chunk)
  1690. # we must decompress, so it'll call assert_id() in there:
  1691. self.repo_objs.parse(defect_chunk, encrypted_data, decompress=True, ro_type=ROBJ_DONTCARE)
  1692. except IntegrityErrorBase:
  1693. # failed twice -> get rid of this chunk
  1694. del self.chunks[defect_chunk]
  1695. self.repository.delete(defect_chunk)
  1696. logger.debug("chunk %s deleted.", bin_to_hex(defect_chunk))
  1697. else:
  1698. logger.warning("chunk %s not deleted, did not consistently fail.", bin_to_hex(defect_chunk))
  1699. else:
  1700. logger.warning("Found defect chunks. With --repair, they would get deleted.")
  1701. for defect_chunk in defect_chunks:
  1702. logger.debug("chunk %s is defect.", bin_to_hex(defect_chunk))
  1703. log = logger.error if errors else logger.info
  1704. log(
  1705. "Finished cryptographic data integrity verification, verified %d chunks with %d integrity errors.",
  1706. chunks_count,
  1707. errors,
  1708. )
  1709. def rebuild_manifest(self):
  1710. """Rebuild the manifest object."""
  1711. logger.info("Rebuilding missing/corrupted manifest.")
  1712. # as we have lost the manifest, we do not know any more what valid item keys we had.
  1713. # collecting any key we encounter in a damaged repo seems unwise, thus we just use
  1714. # the hardcoded list from the source code. thus, it is not recommended to rebuild a
  1715. # lost manifest on a older borg version than the most recent one that was ever used
  1716. # within this repository (assuming that newer borg versions support more item keys).
  1717. return Manifest(self.key, self.repository)
  1718. def rebuild_archives_directory(self):
  1719. """Rebuild the archives directory, undeleting archives.
  1720. Iterates through all objects in the repository looking for archive metadata blocks.
  1721. When finding some that do not have a corresponding archives directory entry (either
  1722. a normal entry for an "existing" archive, or a soft-deleted entry for a "deleted"
  1723. archive), it will create that entry (making the archives directory consistent with
  1724. the repository).
  1725. """
  1726. def valid_archive(obj):
  1727. if not isinstance(obj, dict):
  1728. return False
  1729. return REQUIRED_ARCHIVE_KEYS.issubset(obj)
  1730. logger.info("Rebuilding missing archives directory entries, this might take some time...")
  1731. pi = ProgressIndicatorPercent(
  1732. total=len(self.chunks),
  1733. msg="Rebuilding missing archives directory entries %6.2f%%",
  1734. step=0.01,
  1735. msgid="check.rebuild_archives_directory",
  1736. )
  1737. for chunk_id, _ in self.chunks.iteritems():
  1738. pi.show()
  1739. cdata = self.repository.get(chunk_id, read_data=False) # only get metadata
  1740. try:
  1741. meta = self.repo_objs.parse_meta(chunk_id, cdata, ro_type=ROBJ_DONTCARE)
  1742. except IntegrityErrorBase as exc:
  1743. logger.error("Skipping corrupted chunk: %s", exc)
  1744. self.error_found = True
  1745. continue
  1746. if meta["type"] != ROBJ_ARCHIVE_META:
  1747. continue
  1748. # now we know it is an archive metadata chunk, load the full object from the repo:
  1749. cdata = self.repository.get(chunk_id)
  1750. try:
  1751. meta, data = self.repo_objs.parse(chunk_id, cdata, ro_type=ROBJ_DONTCARE)
  1752. except IntegrityErrorBase as exc:
  1753. logger.error("Skipping corrupted chunk: %s", exc)
  1754. self.error_found = True
  1755. continue
  1756. if meta["type"] != ROBJ_ARCHIVE_META:
  1757. continue # should never happen
  1758. try:
  1759. archive = msgpack.unpackb(data)
  1760. # Ignore exceptions that might be raised when feeding msgpack with invalid data
  1761. except msgpack.UnpackException:
  1762. continue
  1763. if valid_archive(archive):
  1764. archive = self.key.unpack_archive(data)
  1765. archive = ArchiveItem(internal_dict=archive)
  1766. name = archive.name
  1767. archive_id, archive_id_hex = chunk_id, bin_to_hex(chunk_id)
  1768. if self.manifest.archives.exists_id(archive_id, deleted=False):
  1769. logger.debug(f"We already have an archives directory entry for {name} {archive_id_hex}.")
  1770. elif self.manifest.archives.exists_id(archive_id, deleted=True):
  1771. logger.debug(
  1772. f"We already have a soft-deleted archives directory entry for {name} {archive_id_hex}."
  1773. )
  1774. else:
  1775. self.error_found = True
  1776. if self.repair:
  1777. logger.warning(f"Creating archives directory entry for {name} {archive_id_hex}.")
  1778. self.manifest.archives.create(name, archive_id, archive.time)
  1779. else:
  1780. logger.warning(f"Would create archives directory entry for {name} {archive_id_hex}.")
  1781. pi.finish()
  1782. logger.info("Rebuilding missing archives directory entries completed.")
  1783. def rebuild_archives(
  1784. self, first=0, last=0, sort_by="", match=None, older=None, newer=None, oldest=None, newest=None
  1785. ):
  1786. """Analyze and rebuild archives, expecting some damage and trying to make stuff consistent again."""
  1787. def add_callback(chunk):
  1788. id_ = self.key.id_hash(chunk)
  1789. cdata = self.repo_objs.format(id_, {}, chunk, ro_type=ROBJ_ARCHIVE_STREAM)
  1790. add_reference(id_, len(chunk), cdata)
  1791. return id_
  1792. def add_reference(id_, size, cdata):
  1793. # either we already have this chunk in repo and chunks index or we add it now
  1794. if id_ not in self.chunks:
  1795. assert cdata is not None
  1796. self.chunks[id_] = ChunkIndexEntry(flags=ChunkIndex.F_USED, size=size)
  1797. if self.repair:
  1798. self.repository.put(id_, cdata)
  1799. def verify_file_chunks(archive_name, item):
  1800. """Verifies that all file chunks are present. Missing file chunks will be logged."""
  1801. offset = 0
  1802. for chunk in item.chunks:
  1803. chunk_id, size = chunk
  1804. if chunk_id not in self.chunks:
  1805. logger.error(
  1806. "{}: {}: Missing file chunk detected (Byte {}-{}, Chunk {}).".format(
  1807. archive_name, item.path, offset, offset + size, bin_to_hex(chunk_id)
  1808. )
  1809. )
  1810. self.error_found = True
  1811. offset += size
  1812. if "size" in item:
  1813. item_size = item.size
  1814. item_chunks_size = item.get_size(from_chunks=True)
  1815. if item_size != item_chunks_size:
  1816. # just warn, but keep the inconsistency, so that borg extract can warn about it.
  1817. logger.warning(
  1818. "{}: {}: size inconsistency detected: size {}, chunks size {}".format(
  1819. archive_name, item.path, item_size, item_chunks_size
  1820. )
  1821. )
  1822. def robust_iterator(archive):
  1823. """Iterates through all archive items
  1824. Missing item chunks will be skipped and the msgpack stream will be restarted
  1825. """
  1826. item_keys = self.manifest.item_keys
  1827. required_item_keys = REQUIRED_ITEM_KEYS
  1828. unpacker = RobustUnpacker(
  1829. lambda item: isinstance(item, StableDict) and "path" in item, self.manifest.item_keys
  1830. )
  1831. _state = 0
  1832. def missing_chunk_detector(chunk_id):
  1833. nonlocal _state
  1834. if _state % 2 != int(chunk_id not in self.chunks):
  1835. _state += 1
  1836. return _state
  1837. def report(msg, chunk_id, chunk_no):
  1838. cid = bin_to_hex(chunk_id)
  1839. msg += " [chunk: %06d_%s]" % (chunk_no, cid) # see "debug dump-archive-items"
  1840. self.error_found = True
  1841. logger.error(msg)
  1842. def list_keys_safe(keys):
  1843. return ", ".join(k.decode(errors="replace") if isinstance(k, bytes) else str(k) for k in keys)
  1844. def valid_item(obj):
  1845. if not isinstance(obj, StableDict):
  1846. return False, "not a dictionary"
  1847. keys = set(obj)
  1848. if not required_item_keys.issubset(keys):
  1849. return False, "missing required keys: " + list_keys_safe(required_item_keys - keys)
  1850. if not keys.issubset(item_keys):
  1851. return False, "invalid keys: " + list_keys_safe(keys - item_keys)
  1852. return True, ""
  1853. i = 0
  1854. archive_items = archive_get_items(archive, repo_objs=self.repo_objs, repository=repository)
  1855. for state, items in groupby(archive_items, missing_chunk_detector):
  1856. items = list(items)
  1857. if state % 2:
  1858. for chunk_id in items:
  1859. report("item metadata chunk missing", chunk_id, i)
  1860. i += 1
  1861. continue
  1862. if state > 0:
  1863. unpacker.resync()
  1864. for chunk_id, cdata in zip(items, repository.get_many(items)):
  1865. try:
  1866. _, data = self.repo_objs.parse(chunk_id, cdata, ro_type=ROBJ_ARCHIVE_STREAM)
  1867. unpacker.feed(data)
  1868. for item in unpacker:
  1869. valid, reason = valid_item(item)
  1870. if valid:
  1871. yield Item(internal_dict=item)
  1872. else:
  1873. report(
  1874. "Did not get expected metadata dict when unpacking item metadata (%s)" % reason,
  1875. chunk_id,
  1876. i,
  1877. )
  1878. except IntegrityError as integrity_error:
  1879. # repo_objs.parse() detected integrity issues.
  1880. # maybe the repo gave us a valid cdata, but not for the chunk_id we wanted.
  1881. # or the authentication of cdata failed, meaning the encrypted data was corrupted.
  1882. report(str(integrity_error), chunk_id, i)
  1883. except msgpack.UnpackException:
  1884. report("Unpacker crashed while unpacking item metadata, trying to resync...", chunk_id, i)
  1885. unpacker.resync()
  1886. except Exception:
  1887. report("Exception while decrypting or unpacking item metadata", chunk_id, i)
  1888. raise
  1889. i += 1
  1890. sort_by = sort_by.split(",")
  1891. if any((first, last, match, older, newer, newest, oldest)):
  1892. archive_infos = self.manifest.archives.list(
  1893. sort_by=sort_by,
  1894. match=match,
  1895. first=first,
  1896. last=last,
  1897. oldest=oldest,
  1898. newest=newest,
  1899. older=older,
  1900. newer=newer,
  1901. )
  1902. if match and not archive_infos:
  1903. logger.warning("--match-archives %s does not match any archives", match)
  1904. if first and len(archive_infos) < first:
  1905. logger.warning("--first %d archives: only found %d archives", first, len(archive_infos))
  1906. if last and len(archive_infos) < last:
  1907. logger.warning("--last %d archives: only found %d archives", last, len(archive_infos))
  1908. else:
  1909. archive_infos = self.manifest.archives.list(sort_by=sort_by)
  1910. num_archives = len(archive_infos)
  1911. pi = ProgressIndicatorPercent(
  1912. total=num_archives, msg="Checking archives %3.1f%%", step=0.1, msgid="check.rebuild_archives"
  1913. )
  1914. with cache_if_remote(self.repository) as repository:
  1915. for i, info in enumerate(archive_infos):
  1916. pi.show(i)
  1917. archive_id, archive_id_hex = info.id, bin_to_hex(info.id)
  1918. logger.info(
  1919. f"Analyzing archive {info.name} {info.ts.astimezone()} {archive_id_hex} ({i + 1}/{num_archives})"
  1920. )
  1921. if archive_id not in self.chunks:
  1922. logger.error(f"Archive metadata block {archive_id_hex} is missing!")
  1923. self.error_found = True
  1924. if self.repair:
  1925. logger.error(f"Deleting broken archive {info.name} {archive_id_hex}.")
  1926. self.manifest.archives.delete_by_id(archive_id)
  1927. else:
  1928. logger.error(f"Would delete broken archive {info.name} {archive_id_hex}.")
  1929. continue
  1930. cdata = self.repository.get(archive_id)
  1931. try:
  1932. _, data = self.repo_objs.parse(archive_id, cdata, ro_type=ROBJ_ARCHIVE_META)
  1933. except IntegrityError as integrity_error:
  1934. logger.error(f"Archive metadata block {archive_id_hex} is corrupted: {integrity_error}")
  1935. self.error_found = True
  1936. if self.repair:
  1937. logger.error(f"Deleting broken archive {info.name} {archive_id_hex}.")
  1938. self.manifest.archives.delete_by_id(archive_id)
  1939. else:
  1940. logger.error(f"Would delete broken archive {info.name} {archive_id_hex}.")
  1941. continue
  1942. archive = self.key.unpack_archive(data)
  1943. archive = ArchiveItem(internal_dict=archive)
  1944. if archive.version != 2:
  1945. raise Exception("Unknown archive metadata version")
  1946. items_buffer = ChunkBuffer(self.key)
  1947. items_buffer.write_chunk = add_callback
  1948. for item in robust_iterator(archive):
  1949. if "chunks" in item:
  1950. verify_file_chunks(info.name, item)
  1951. items_buffer.add(item)
  1952. items_buffer.flush(flush=True)
  1953. if self.repair:
  1954. archive.item_ptrs = archive_put_items(
  1955. items_buffer.chunks, repo_objs=self.repo_objs, add_reference=add_reference
  1956. )
  1957. data = self.key.pack_metadata(archive.as_dict())
  1958. new_archive_id = self.key.id_hash(data)
  1959. logger.debug(f"archive id old: {bin_to_hex(archive_id)}")
  1960. logger.debug(f"archive id new: {bin_to_hex(new_archive_id)}")
  1961. cdata = self.repo_objs.format(new_archive_id, {}, data, ro_type=ROBJ_ARCHIVE_META)
  1962. add_reference(new_archive_id, len(data), cdata)
  1963. self.manifest.archives.create(info.name, new_archive_id, info.ts)
  1964. if archive_id != new_archive_id:
  1965. self.manifest.archives.delete_by_id(archive_id)
  1966. pi.finish()
  1967. def finish(self):
  1968. if self.repair:
  1969. # we may have deleted chunks, remove the chunks index cache!
  1970. logger.info("Deleting chunks cache in repository - next repository access will cause a rebuild.")
  1971. delete_chunkindex_cache(self.repository)
  1972. logger.info("Writing Manifest.")
  1973. self.manifest.write()
  1974. class ArchiveRecreater:
  1975. class Interrupted(Exception):
  1976. def __init__(self, metadata=None):
  1977. self.metadata = metadata or {}
  1978. @staticmethod
  1979. def is_temporary_archive(archive_name):
  1980. return archive_name.endswith(".recreate")
  1981. def __init__(
  1982. self,
  1983. manifest,
  1984. cache,
  1985. matcher,
  1986. exclude_caches=False,
  1987. exclude_if_present=None,
  1988. keep_exclude_tags=False,
  1989. chunker_params=None,
  1990. compression=None,
  1991. dry_run=False,
  1992. stats=False,
  1993. progress=False,
  1994. file_status_printer=None,
  1995. timestamp=None,
  1996. ):
  1997. self.manifest = manifest
  1998. self.repository = manifest.repository
  1999. self.key = manifest.key
  2000. self.repo_objs = manifest.repo_objs
  2001. self.cache = cache
  2002. self.matcher = matcher
  2003. self.exclude_caches = exclude_caches
  2004. self.exclude_if_present = exclude_if_present or []
  2005. self.keep_exclude_tags = keep_exclude_tags
  2006. self.rechunkify = chunker_params is not None
  2007. if self.rechunkify:
  2008. logger.debug("Rechunking archives to %s", chunker_params)
  2009. self.chunker_params = chunker_params or CHUNKER_PARAMS
  2010. self.compression = compression or CompressionSpec("none")
  2011. self.seen_chunks = set()
  2012. self.timestamp = timestamp
  2013. self.dry_run = dry_run
  2014. self.stats = stats
  2015. self.progress = progress
  2016. self.print_file_status = file_status_printer or (lambda *args: None)
  2017. def recreate(self, archive_id, target_name, delete_original, comment=None):
  2018. archive = self.open_archive(archive_id)
  2019. target = self.create_target(archive, target_name)
  2020. if self.exclude_if_present or self.exclude_caches:
  2021. self.matcher_add_tagged_dirs(archive)
  2022. if self.matcher.empty() and not target.recreate_rechunkify and comment is None:
  2023. # nothing to do
  2024. return False
  2025. self.process_items(archive, target)
  2026. self.save(archive, target, comment, delete_original=delete_original)
  2027. return True
  2028. def process_items(self, archive, target):
  2029. matcher = self.matcher
  2030. for item in archive.iter_items():
  2031. if not matcher.match(item.path):
  2032. self.print_file_status("-", item.path) # excluded (either by "-" or by "!")
  2033. continue
  2034. if self.dry_run:
  2035. self.print_file_status("+", item.path) # included
  2036. else:
  2037. self.process_item(archive, target, item)
  2038. if self.progress:
  2039. target.stats.show_progress(final=True)
  2040. def process_item(self, archive, target, item):
  2041. status = file_status(item.mode)
  2042. if "chunks" in item:
  2043. self.print_file_status(status, item.path)
  2044. status = None
  2045. self.process_chunks(archive, target, item)
  2046. target.stats.nfiles += 1
  2047. target.add_item(item, stats=target.stats)
  2048. self.print_file_status(status, item.path)
  2049. def process_chunks(self, archive, target, item):
  2050. if not target.recreate_rechunkify:
  2051. for chunk_id, size in item.chunks:
  2052. self.cache.reuse_chunk(chunk_id, size, target.stats)
  2053. return item.chunks
  2054. chunk_iterator = self.iter_chunks(archive, target, list(item.chunks))
  2055. chunk_processor = partial(self.chunk_processor, target)
  2056. target.process_file_chunks(item, self.cache, target.stats, self.progress, chunk_iterator, chunk_processor)
  2057. def chunk_processor(self, target, chunk):
  2058. chunk_id, data = cached_hash(chunk, self.key.id_hash)
  2059. size = len(data)
  2060. if chunk_id in self.seen_chunks:
  2061. return self.cache.reuse_chunk(chunk_id, size, target.stats)
  2062. chunk_entry = self.cache.add_chunk(chunk_id, {}, data, stats=target.stats, wait=False, ro_type=ROBJ_FILE_STREAM)
  2063. self.cache.repository.async_response(wait=False)
  2064. self.seen_chunks.add(chunk_entry.id)
  2065. return chunk_entry
  2066. def iter_chunks(self, archive, target, chunks):
  2067. chunk_iterator = archive.pipeline.fetch_many(chunks, ro_type=ROBJ_FILE_STREAM)
  2068. if target.recreate_rechunkify:
  2069. # The target.chunker will read the file contents through ChunkIteratorFileWrapper chunk-by-chunk
  2070. # (does not load the entire file into memory)
  2071. file = ChunkIteratorFileWrapper(chunk_iterator)
  2072. yield from target.chunker.chunkify(file)
  2073. else:
  2074. for chunk in chunk_iterator:
  2075. yield Chunk(chunk, size=len(chunk), allocation=CH_DATA)
  2076. def save(self, archive, target, comment=None, delete_original=True):
  2077. if self.dry_run:
  2078. return
  2079. if comment is None:
  2080. comment = archive.metadata.get("comment", "")
  2081. # Keep for the statistics if necessary
  2082. if self.stats:
  2083. _start = target.start
  2084. if self.timestamp is None:
  2085. additional_metadata = {
  2086. "time": archive.metadata.time,
  2087. "time_end": archive.metadata.get("time_end") or archive.metadata.time,
  2088. "command_line": archive.metadata.command_line,
  2089. # but also remember recreate metadata:
  2090. "recreate_command_line": join_cmd(sys.argv),
  2091. }
  2092. else:
  2093. additional_metadata = {
  2094. "command_line": archive.metadata.command_line,
  2095. # but also remember recreate metadata:
  2096. "recreate_command_line": join_cmd(sys.argv),
  2097. }
  2098. target.save(comment=comment, timestamp=self.timestamp, additional_metadata=additional_metadata)
  2099. if delete_original:
  2100. archive.delete()
  2101. if self.stats:
  2102. target.start = _start
  2103. target.end = archive_ts_now()
  2104. log_multi(str(target), str(target.stats))
  2105. def matcher_add_tagged_dirs(self, archive):
  2106. """Add excludes to the matcher created by exclude_cache and exclude_if_present."""
  2107. def exclude(dir, tag_item):
  2108. if self.keep_exclude_tags:
  2109. tag_files.append(PathPrefixPattern(tag_item.path, recurse_dir=False))
  2110. tagged_dirs.append(FnmatchPattern(dir + "/", recurse_dir=False))
  2111. else:
  2112. tagged_dirs.append(PathPrefixPattern(dir, recurse_dir=False))
  2113. matcher = self.matcher
  2114. tag_files = []
  2115. tagged_dirs = []
  2116. for item in archive.iter_items(
  2117. filter=lambda item: os.path.basename(item.path) == CACHE_TAG_NAME or matcher.match(item.path)
  2118. ):
  2119. dir, tag_file = os.path.split(item.path)
  2120. if tag_file in self.exclude_if_present:
  2121. exclude(dir, item)
  2122. elif self.exclude_caches and tag_file == CACHE_TAG_NAME and stat.S_ISREG(item.mode):
  2123. file = open_item(archive, item)
  2124. if file.read(len(CACHE_TAG_CONTENTS)) == CACHE_TAG_CONTENTS:
  2125. exclude(dir, item)
  2126. matcher.add(tag_files, IECommand.Include)
  2127. matcher.add(tagged_dirs, IECommand.ExcludeNoRecurse)
  2128. def create_target(self, archive, target_name):
  2129. """Create target archive."""
  2130. target = self.create_target_archive(target_name)
  2131. # If the archives use the same chunker params, then don't rechunkify
  2132. source_chunker_params = tuple(archive.metadata.get("chunker_params", []))
  2133. if len(source_chunker_params) == 4 and isinstance(source_chunker_params[0], int):
  2134. # this is a borg < 1.2 chunker_params tuple, no chunker algo specified, but we only had buzhash:
  2135. source_chunker_params = (CH_BUZHASH,) + source_chunker_params
  2136. target.recreate_rechunkify = self.rechunkify and source_chunker_params != target.chunker_params
  2137. if target.recreate_rechunkify:
  2138. logger.debug(
  2139. "Rechunking archive from %s to %s", source_chunker_params or "(unknown)", target.chunker_params
  2140. )
  2141. target.process_file_chunks = ChunksProcessor(
  2142. cache=self.cache, key=self.key, add_item=target.add_item, rechunkify=target.recreate_rechunkify
  2143. ).process_file_chunks
  2144. target.chunker = get_chunker(*target.chunker_params, key=self.key, sparse=False)
  2145. return target
  2146. def create_target_archive(self, name):
  2147. target = Archive(
  2148. self.manifest,
  2149. name,
  2150. create=True,
  2151. progress=self.progress,
  2152. chunker_params=self.chunker_params,
  2153. cache=self.cache,
  2154. )
  2155. return target
  2156. def open_archive(self, archive_id, **kwargs):
  2157. return Archive(self.manifest, archive_id, cache=self.cache, **kwargs)