archive.py 97 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142
  1. import json
  2. import os
  3. import socket
  4. import stat
  5. import sys
  6. import time
  7. from collections import OrderedDict
  8. from contextlib import contextmanager
  9. from datetime import datetime, timezone, timedelta
  10. from functools import partial
  11. from getpass import getuser
  12. from io import BytesIO
  13. from itertools import groupby, zip_longest
  14. from shutil import get_terminal_size
  15. from .platformflags import is_win32, is_linux, is_freebsd, is_darwin
  16. from .logger import create_logger
  17. logger = create_logger()
  18. from . import xattr
  19. from .chunker import get_chunker, max_chunk_size, Chunk
  20. from .cache import ChunkListEntry
  21. from .crypto.key import key_factory
  22. from .compress import Compressor, CompressionSpec
  23. from .constants import * # NOQA
  24. from .crypto.low_level import IntegrityError as IntegrityErrorBase
  25. from .hashindex import ChunkIndex, ChunkIndexEntry, CacheSynchronizer
  26. from .helpers import Manifest
  27. from .helpers import hardlinkable
  28. from .helpers import ChunkIteratorFileWrapper, open_item
  29. from .helpers import Error, IntegrityError, set_ec
  30. from .platform import uid2user, user2uid, gid2group, group2gid
  31. from .helpers import parse_timestamp, to_localtime
  32. from .helpers import OutputTimestamp, format_timedelta, format_file_size, file_status, FileSize
  33. from .helpers import safe_encode, safe_decode, make_path_safe, remove_surrogates
  34. from .helpers import StableDict
  35. from .helpers import bin_to_hex
  36. from .helpers import safe_ns
  37. from .helpers import ellipsis_truncate, ProgressIndicatorPercent, log_multi
  38. from .helpers import os_open, flags_normal, flags_dir
  39. from .helpers import msgpack
  40. from .helpers import sig_int
  41. from .patterns import PathPrefixPattern, FnmatchPattern, IECommand
  42. from .item import Item, ArchiveItem, ItemDiff
  43. from .lrucache import LRUCache
  44. from .platform import acl_get, acl_set, set_flags, get_flags, swidth, hostname
  45. from .remote import cache_if_remote
  46. from .repository import Repository, LIST_SCAN_LIMIT
  47. has_lchmod = hasattr(os, 'lchmod')
  48. has_link = hasattr(os, 'link')
  49. class Statistics:
  50. def __init__(self, output_json=False):
  51. self.output_json = output_json
  52. self.osize = self.csize = self.usize = self.nfiles = 0
  53. self.osize_parts = self.csize_parts = self.usize_parts = self.nfiles_parts = 0
  54. self.last_progress = 0 # timestamp when last progress was shown
  55. def update(self, size, csize, unique, part=False):
  56. if not part:
  57. self.osize += size
  58. self.csize += csize
  59. if unique:
  60. self.usize += csize
  61. else:
  62. self.osize_parts += size
  63. self.csize_parts += csize
  64. if unique:
  65. self.usize_parts += csize
  66. def __add__(self, other):
  67. if not isinstance(other, Statistics):
  68. raise TypeError('can only add Statistics objects')
  69. stats = Statistics(self.output_json)
  70. stats.osize = self.osize + other.osize
  71. stats.csize = self.csize + other.csize
  72. stats.usize = self.usize + other.usize
  73. stats.nfiles = self.nfiles + other.nfiles
  74. stats.osize_parts = self.osize_parts + other.osize_parts
  75. stats.csize_parts = self.csize_parts + other.csize_parts
  76. stats.usize_parts = self.usize_parts + other.usize_parts
  77. stats.nfiles_parts = self.nfiles_parts + other.nfiles_parts
  78. return stats
  79. summary = "{label:15} {stats.osize_fmt:>20s} {stats.csize_fmt:>20s} {stats.usize_fmt:>20s}"
  80. def __str__(self):
  81. return self.summary.format(stats=self, label='This archive:')
  82. def __repr__(self):
  83. return "<{cls} object at {hash:#x} ({self.osize}, {self.csize}, {self.usize})>".format(
  84. cls=type(self).__name__, hash=id(self), self=self)
  85. def as_dict(self):
  86. return {
  87. 'original_size': FileSize(self.osize),
  88. 'compressed_size': FileSize(self.csize),
  89. 'deduplicated_size': FileSize(self.usize),
  90. 'nfiles': self.nfiles,
  91. }
  92. @property
  93. def osize_fmt(self):
  94. return format_file_size(self.osize)
  95. @property
  96. def usize_fmt(self):
  97. return format_file_size(self.usize)
  98. @property
  99. def csize_fmt(self):
  100. return format_file_size(self.csize)
  101. def show_progress(self, item=None, final=False, stream=None, dt=None):
  102. now = time.monotonic()
  103. if dt is None or now - self.last_progress > dt:
  104. self.last_progress = now
  105. if self.output_json:
  106. data = self.as_dict()
  107. data.update({
  108. 'time': time.time(),
  109. 'type': 'archive_progress',
  110. 'path': remove_surrogates(item.path if item else ''),
  111. })
  112. msg = json.dumps(data)
  113. end = '\n'
  114. else:
  115. columns, lines = get_terminal_size()
  116. if not final:
  117. msg = '{0.osize_fmt} O {0.csize_fmt} C {0.usize_fmt} D {0.nfiles} N '.format(self)
  118. path = remove_surrogates(item.path) if item else ''
  119. space = columns - swidth(msg)
  120. if space < 12:
  121. msg = ''
  122. space = columns - swidth(msg)
  123. if space >= 8:
  124. msg += ellipsis_truncate(path, space)
  125. else:
  126. msg = ' ' * columns
  127. end = '\r'
  128. print(msg, end=end, file=stream or sys.stderr, flush=True)
  129. def is_special(mode):
  130. # file types that get special treatment in --read-special mode
  131. return stat.S_ISBLK(mode) or stat.S_ISCHR(mode) or stat.S_ISFIFO(mode)
  132. class BackupError(Exception):
  133. """
  134. Exception raised for non-OSError-based exceptions while accessing backup files.
  135. """
  136. class BackupOSError(Exception):
  137. """
  138. Wrapper for OSError raised while accessing backup files.
  139. Borg does different kinds of IO, and IO failures have different consequences.
  140. This wrapper represents failures of input file or extraction IO.
  141. These are non-critical and are only reported (exit code = 1, warning).
  142. Any unwrapped IO error is critical and aborts execution (for example repository IO failure).
  143. """
  144. def __init__(self, op, os_error):
  145. self.op = op
  146. self.os_error = os_error
  147. self.errno = os_error.errno
  148. self.strerror = os_error.strerror
  149. self.filename = os_error.filename
  150. def __str__(self):
  151. if self.op:
  152. return '%s: %s' % (self.op, self.os_error)
  153. else:
  154. return str(self.os_error)
  155. class BackupIO:
  156. op = ''
  157. def __call__(self, op=''):
  158. self.op = op
  159. return self
  160. def __enter__(self):
  161. pass
  162. def __exit__(self, exc_type, exc_val, exc_tb):
  163. if exc_type and issubclass(exc_type, OSError):
  164. raise BackupOSError(self.op, exc_val) from exc_val
  165. backup_io = BackupIO()
  166. def backup_io_iter(iterator):
  167. backup_io.op = 'read'
  168. while True:
  169. with backup_io:
  170. try:
  171. item = next(iterator)
  172. except StopIteration:
  173. return
  174. yield item
  175. def stat_update_check(st_old, st_curr):
  176. """
  177. this checks for some race conditions between the first filename-based stat()
  178. we did before dispatching to the (hopefully correct) file type backup handler
  179. and the (hopefully) fd-based fstat() we did in the handler.
  180. if there is a problematic difference (e.g. file type changed), we rather
  181. skip the file than being tricked into a security problem.
  182. such races should only happen if:
  183. - we are backing up a live filesystem (no snapshot, not inactive)
  184. - if files change due to normal fs activity at an unfortunate time
  185. - if somebody is doing an attack against us
  186. """
  187. # assuming that a file type change implicates a different inode change AND that inode numbers
  188. # are not duplicate in a short timeframe, this check is redundant and solved by the ino check:
  189. if stat.S_IFMT(st_old.st_mode) != stat.S_IFMT(st_curr.st_mode):
  190. # in this case, we dispatched to wrong handler - abort
  191. raise BackupError('file type changed (race condition), skipping file')
  192. if st_old.st_ino != st_curr.st_ino:
  193. # in this case, the hardlinks-related code in create_helper has the wrong inode - abort!
  194. raise BackupError('file inode changed (race condition), skipping file')
  195. # looks ok, we are still dealing with the same thing - return current stat:
  196. return st_curr
  197. @contextmanager
  198. def OsOpen(*, flags, path=None, parent_fd=None, name=None, noatime=False, op='open'):
  199. with backup_io(op):
  200. fd = os_open(path=path, parent_fd=parent_fd, name=name, flags=flags, noatime=noatime)
  201. try:
  202. yield fd
  203. finally:
  204. # On windows fd is None for directories.
  205. if fd is not None:
  206. os.close(fd)
  207. class DownloadPipeline:
  208. def __init__(self, repository, key):
  209. self.repository = repository
  210. self.key = key
  211. def unpack_many(self, ids, filter=None, partial_extract=False, preload=False, hardlink_masters=None):
  212. """
  213. Return iterator of items.
  214. *ids* is a chunk ID list of an item stream. *filter* is a callable
  215. to decide whether an item will be yielded. *preload* preloads the data chunks of every yielded item.
  216. Warning: if *preload* is True then all data chunks of every yielded item have to be retrieved,
  217. otherwise preloaded chunks will accumulate in RemoteRepository and create a memory leak.
  218. """
  219. def _preload(chunks):
  220. self.repository.preload([c.id for c in chunks])
  221. masters_preloaded = set()
  222. unpacker = msgpack.Unpacker(use_list=False)
  223. for data in self.fetch_many(ids):
  224. unpacker.feed(data)
  225. items = [Item(internal_dict=item) for item in unpacker]
  226. for item in items:
  227. if 'chunks' in item:
  228. item.chunks = [ChunkListEntry(*e) for e in item.chunks]
  229. if filter:
  230. items = [item for item in items if filter(item)]
  231. if preload:
  232. if filter and partial_extract:
  233. # if we do only a partial extraction, it gets a bit
  234. # complicated with computing the preload items: if a hardlink master item is not
  235. # selected (== not extracted), we will still need to preload its chunks if a
  236. # corresponding hardlink slave is selected (== is extracted).
  237. # due to a side effect of the filter() call, we now have hardlink_masters dict populated.
  238. for item in items:
  239. if 'chunks' in item: # regular file, maybe a hardlink master
  240. _preload(item.chunks)
  241. # if this is a hardlink master, remember that we already preloaded it:
  242. if 'source' not in item and hardlinkable(item.mode) and item.get('hardlink_master', True):
  243. masters_preloaded.add(item.path)
  244. elif 'source' in item and hardlinkable(item.mode): # hardlink slave
  245. source = item.source
  246. if source not in masters_preloaded:
  247. # we only need to preload *once* (for the 1st selected slave)
  248. chunks, _ = hardlink_masters[source]
  249. if chunks is not None:
  250. _preload(chunks)
  251. masters_preloaded.add(source)
  252. else:
  253. # easy: we do not have a filter, thus all items are selected, thus we need to preload all chunks.
  254. for item in items:
  255. if 'chunks' in item:
  256. _preload(item.chunks)
  257. for item in items:
  258. yield item
  259. def fetch_many(self, ids, is_preloaded=False):
  260. for id_, data in zip(ids, self.repository.get_many(ids, is_preloaded=is_preloaded)):
  261. yield self.key.decrypt(id_, data)
  262. class ChunkBuffer:
  263. BUFFER_SIZE = 8 * 1024 * 1024
  264. def __init__(self, key, chunker_params=ITEMS_CHUNKER_PARAMS):
  265. self.buffer = BytesIO()
  266. self.packer = msgpack.Packer()
  267. self.chunks = []
  268. self.key = key
  269. self.chunker = get_chunker(*chunker_params, seed=self.key.chunk_seed)
  270. def add(self, item):
  271. self.buffer.write(self.packer.pack(item.as_dict()))
  272. if self.is_full():
  273. self.flush()
  274. def write_chunk(self, chunk):
  275. raise NotImplementedError
  276. def flush(self, flush=False):
  277. if self.buffer.tell() == 0:
  278. return
  279. self.buffer.seek(0)
  280. # The chunker returns a memoryview to its internal buffer,
  281. # thus a copy is needed before resuming the chunker iterator.
  282. # note: this is the items metadata stream chunker, we only will get CH_DATA allocation here,
  283. # thus chunk.data will always be data bytes.
  284. chunks = list(bytes(chunk.data) for chunk in self.chunker.chunkify(self.buffer))
  285. self.buffer.seek(0)
  286. self.buffer.truncate(0)
  287. # Leave the last partial chunk in the buffer unless flush is True
  288. end = None if flush or len(chunks) == 1 else -1
  289. for chunk in chunks[:end]:
  290. self.chunks.append(self.write_chunk(chunk))
  291. if end == -1:
  292. self.buffer.write(chunks[-1])
  293. def is_full(self):
  294. return self.buffer.tell() > self.BUFFER_SIZE
  295. class CacheChunkBuffer(ChunkBuffer):
  296. def __init__(self, cache, key, stats, chunker_params=ITEMS_CHUNKER_PARAMS):
  297. super().__init__(key, chunker_params)
  298. self.cache = cache
  299. self.stats = stats
  300. def write_chunk(self, chunk):
  301. id_, _, _ = self.cache.add_chunk(self.key.id_hash(chunk), chunk, self.stats, wait=False)
  302. self.cache.repository.async_response(wait=False)
  303. return id_
  304. class Archive:
  305. class DoesNotExist(Error):
  306. """Archive {} does not exist"""
  307. class AlreadyExists(Error):
  308. """Archive {} already exists"""
  309. class IncompatibleFilesystemEncodingError(Error):
  310. """Failed to encode filename "{}" into file system encoding "{}". Consider configuring the LANG environment variable."""
  311. def __init__(self, repository, key, manifest, name, cache=None, create=False,
  312. checkpoint_interval=1800, numeric_owner=False, noatime=False, noctime=False, noflags=False,
  313. progress=False, chunker_params=CHUNKER_PARAMS, start=None, start_monotonic=None, end=None,
  314. consider_part_files=False, log_json=False):
  315. self.cwd = os.getcwd()
  316. self.key = key
  317. self.repository = repository
  318. self.cache = cache
  319. self.manifest = manifest
  320. self.hard_links = {}
  321. self.stats = Statistics(output_json=log_json)
  322. self.show_progress = progress
  323. self.name = name # overwritten later with name from archive metadata
  324. self.name_in_manifest = name # can differ from .name later (if borg check fixed duplicate archive names)
  325. self.comment = None
  326. self.checkpoint_interval = checkpoint_interval
  327. self.numeric_owner = numeric_owner
  328. self.noatime = noatime
  329. self.noctime = noctime
  330. self.noflags = noflags
  331. assert (start is None) == (start_monotonic is None), 'Logic error: if start is given, start_monotonic must be given as well and vice versa.'
  332. if start is None:
  333. start = datetime.utcnow()
  334. start_monotonic = time.monotonic()
  335. self.chunker_params = chunker_params
  336. self.start = start
  337. self.start_monotonic = start_monotonic
  338. if end is None:
  339. end = datetime.utcnow()
  340. self.end = end
  341. self.consider_part_files = consider_part_files
  342. self.pipeline = DownloadPipeline(self.repository, self.key)
  343. self.create = create
  344. if self.create:
  345. self.items_buffer = CacheChunkBuffer(self.cache, self.key, self.stats)
  346. if name in manifest.archives:
  347. raise self.AlreadyExists(name)
  348. i = 0
  349. while True:
  350. self.checkpoint_name = '%s.checkpoint%s' % (name, i and ('.%d' % i) or '')
  351. if self.checkpoint_name not in manifest.archives:
  352. break
  353. i += 1
  354. else:
  355. info = self.manifest.archives.get(name)
  356. if info is None:
  357. raise self.DoesNotExist(name)
  358. self.load(info.id)
  359. self.zeros = None
  360. def _load_meta(self, id):
  361. data = self.key.decrypt(id, self.repository.get(id))
  362. metadata = ArchiveItem(internal_dict=msgpack.unpackb(data))
  363. if metadata.version != 1:
  364. raise Exception('Unknown archive metadata version')
  365. return metadata
  366. def load(self, id):
  367. self.id = id
  368. self.metadata = self._load_meta(self.id)
  369. self.metadata.cmdline = [safe_decode(arg) for arg in self.metadata.cmdline]
  370. self.name = self.metadata.name
  371. self.comment = self.metadata.get('comment', '')
  372. @property
  373. def ts(self):
  374. """Timestamp of archive creation (start) in UTC"""
  375. ts = self.metadata.time
  376. return parse_timestamp(ts)
  377. @property
  378. def ts_end(self):
  379. """Timestamp of archive creation (end) in UTC"""
  380. # fall back to time if there is no time_end present in metadata
  381. ts = self.metadata.get('time_end') or self.metadata.time
  382. return parse_timestamp(ts)
  383. @property
  384. def fpr(self):
  385. return bin_to_hex(self.id)
  386. @property
  387. def duration(self):
  388. return format_timedelta(self.end - self.start)
  389. @property
  390. def duration_from_meta(self):
  391. return format_timedelta(self.ts_end - self.ts)
  392. def info(self):
  393. if self.create:
  394. stats = self.stats
  395. start = self.start.replace(tzinfo=timezone.utc)
  396. end = self.end.replace(tzinfo=timezone.utc)
  397. else:
  398. stats = self.calc_stats(self.cache)
  399. start = self.ts
  400. end = self.ts_end
  401. info = {
  402. 'name': self.name,
  403. 'id': self.fpr,
  404. 'start': OutputTimestamp(start),
  405. 'end': OutputTimestamp(end),
  406. 'duration': (end - start).total_seconds(),
  407. 'stats': stats.as_dict(),
  408. 'limits': {
  409. 'max_archive_size': self.cache.chunks[self.id].csize / MAX_DATA_SIZE,
  410. },
  411. }
  412. if self.create:
  413. info['command_line'] = sys.argv
  414. else:
  415. info.update({
  416. 'command_line': self.metadata.cmdline,
  417. 'hostname': self.metadata.hostname,
  418. 'username': self.metadata.username,
  419. 'comment': self.metadata.get('comment', ''),
  420. 'chunker_params': self.metadata.get('chunker_params', ''),
  421. })
  422. return info
  423. def __str__(self):
  424. return '''\
  425. Repository: {location}
  426. Archive name: {0.name}
  427. Archive fingerprint: {0.fpr}
  428. Time (start): {start}
  429. Time (end): {end}
  430. Duration: {0.duration}
  431. Number of files: {0.stats.nfiles}
  432. Utilization of max. archive size: {csize_max:.0%}
  433. '''.format(
  434. self,
  435. start=OutputTimestamp(self.start.replace(tzinfo=timezone.utc)),
  436. end=OutputTimestamp(self.end.replace(tzinfo=timezone.utc)),
  437. csize_max=self.cache.chunks[self.id].csize / MAX_DATA_SIZE,
  438. location=self.repository._location.canonical_path()
  439. )
  440. def __repr__(self):
  441. return 'Archive(%r)' % self.name
  442. def item_filter(self, item, filter=None):
  443. if not self.consider_part_files and 'part' in item:
  444. # this is a part(ial) file, we usually don't want to consider it.
  445. return False
  446. return filter(item) if filter else True
  447. def iter_items(self, filter=None, partial_extract=False, preload=False, hardlink_masters=None):
  448. # note: when calling this with preload=True, later fetch_many() must be called with
  449. # is_preloaded=True or the RemoteRepository code will leak memory!
  450. assert not (filter and partial_extract and preload) or hardlink_masters is not None
  451. for item in self.pipeline.unpack_many(self.metadata.items, partial_extract=partial_extract,
  452. preload=preload, hardlink_masters=hardlink_masters,
  453. filter=lambda item: self.item_filter(item, filter)):
  454. yield item
  455. def add_item(self, item, show_progress=True, stats=None):
  456. if show_progress and self.show_progress:
  457. if stats is None:
  458. stats = self.stats
  459. stats.show_progress(item=item, dt=0.2)
  460. self.items_buffer.add(item)
  461. def write_checkpoint(self):
  462. self.save(self.checkpoint_name)
  463. del self.manifest.archives[self.checkpoint_name]
  464. self.cache.chunk_decref(self.id, self.stats)
  465. def save(self, name=None, comment=None, timestamp=None, stats=None, additional_metadata=None):
  466. name = name or self.name
  467. if name in self.manifest.archives:
  468. raise self.AlreadyExists(name)
  469. self.items_buffer.flush(flush=True)
  470. duration = timedelta(seconds=time.monotonic() - self.start_monotonic)
  471. if timestamp is None:
  472. end = datetime.utcnow()
  473. start = end - duration
  474. else:
  475. end = timestamp + duration
  476. start = timestamp
  477. self.start = start
  478. self.end = end
  479. metadata = {
  480. 'version': 1,
  481. 'name': name,
  482. 'comment': comment or '',
  483. 'items': self.items_buffer.chunks,
  484. 'cmdline': sys.argv,
  485. 'hostname': hostname,
  486. 'username': getuser(),
  487. 'time': start.strftime(ISO_FORMAT),
  488. 'time_end': end.strftime(ISO_FORMAT),
  489. 'chunker_params': self.chunker_params,
  490. }
  491. if stats is not None:
  492. metadata.update({
  493. 'size': stats.osize,
  494. 'csize': stats.csize,
  495. 'nfiles': stats.nfiles,
  496. 'size_parts': stats.osize_parts,
  497. 'csize_parts': stats.csize_parts,
  498. 'nfiles_parts': stats.nfiles_parts})
  499. metadata.update(additional_metadata or {})
  500. metadata = ArchiveItem(metadata)
  501. data = self.key.pack_and_authenticate_metadata(metadata.as_dict(), context=b'archive')
  502. self.id = self.key.id_hash(data)
  503. try:
  504. self.cache.add_chunk(self.id, data, self.stats)
  505. except IntegrityError as err:
  506. err_msg = str(err)
  507. # hack to avoid changing the RPC protocol by introducing new (more specific) exception class
  508. if 'More than allowed put data' in err_msg:
  509. raise Error('%s - archive too big (issue #1473)!' % err_msg)
  510. else:
  511. raise
  512. while self.repository.async_response(wait=True) is not None:
  513. pass
  514. self.manifest.archives[name] = (self.id, metadata.time)
  515. self.manifest.write()
  516. self.repository.commit(compact=False)
  517. self.cache.commit()
  518. def calc_stats(self, cache, want_unique=True):
  519. have_borg12_meta = self.metadata.get('nfiles') is not None
  520. if have_borg12_meta and not want_unique:
  521. unique_csize = 0
  522. else:
  523. def add(id):
  524. entry = cache.chunks[id]
  525. archive_index.add(id, 1, entry.size, entry.csize)
  526. archive_index = ChunkIndex()
  527. sync = CacheSynchronizer(archive_index)
  528. add(self.id)
  529. pi = ProgressIndicatorPercent(total=len(self.metadata.items), msg='Calculating statistics... %3d%%',
  530. msgid='archive.calc_stats')
  531. for id, chunk in zip(self.metadata.items, self.repository.get_many(self.metadata.items)):
  532. pi.show(increase=1)
  533. add(id)
  534. data = self.key.decrypt(id, chunk)
  535. sync.feed(data)
  536. unique_csize = archive_index.stats_against(cache.chunks)[3]
  537. pi.finish()
  538. stats = Statistics()
  539. stats.usize = unique_csize # the part files use same chunks as the full file
  540. if not have_borg12_meta:
  541. if self.consider_part_files:
  542. stats.nfiles = sync.num_files_totals
  543. stats.osize = sync.size_totals
  544. stats.csize = sync.csize_totals
  545. else:
  546. stats.nfiles = sync.num_files_totals - sync.num_files_parts
  547. stats.osize = sync.size_totals - sync.size_parts
  548. stats.csize = sync.csize_totals - sync.csize_parts
  549. else:
  550. if self.consider_part_files:
  551. stats.nfiles = self.metadata.nfiles_parts + self.metadata.nfiles
  552. stats.osize = self.metadata.size_parts + self.metadata.size
  553. stats.csize = self.metadata.csize_parts + self.metadata.csize
  554. else:
  555. stats.nfiles = self.metadata.nfiles
  556. stats.osize = self.metadata.size
  557. stats.csize = self.metadata.csize
  558. return stats
  559. @contextmanager
  560. def extract_helper(self, dest, item, path, stripped_components, original_path, hardlink_masters):
  561. hardlink_set = False
  562. # Hard link?
  563. if 'source' in item:
  564. source = os.path.join(dest, *item.source.split(os.sep)[stripped_components:])
  565. chunks, link_target = hardlink_masters.get(item.source, (None, source))
  566. if link_target and has_link:
  567. # Hard link was extracted previously, just link
  568. with backup_io('link'):
  569. os.link(link_target, path)
  570. hardlink_set = True
  571. elif chunks is not None:
  572. # assign chunks to this item, since the item which had the chunks was not extracted
  573. item.chunks = chunks
  574. yield hardlink_set
  575. if not hardlink_set and hardlink_masters:
  576. if has_link:
  577. # Update master entry with extracted item path, so that following hardlinks don't extract twice.
  578. # We have hardlinking support, so we will hardlink not extract.
  579. hardlink_masters[item.get('source') or original_path] = (None, path)
  580. else:
  581. # Broken platform with no hardlinking support.
  582. # In this case, we *want* to extract twice, because there is no other way.
  583. pass
  584. def extract_item(self, item, restore_attrs=True, dry_run=False, stdout=False, sparse=False,
  585. hardlink_masters=None, stripped_components=0, original_path=None, pi=None):
  586. """
  587. Extract archive item.
  588. :param item: the item to extract
  589. :param restore_attrs: restore file attributes
  590. :param dry_run: do not write any data
  591. :param stdout: write extracted data to stdout
  592. :param sparse: write sparse files (chunk-granularity, independent of the original being sparse)
  593. :param hardlink_masters: maps paths to (chunks, link_target) for extracting subtrees with hardlinks correctly
  594. :param stripped_components: stripped leading path components to correct hard link extraction
  595. :param original_path: 'path' key as stored in archive
  596. :param pi: ProgressIndicatorPercent (or similar) for file extraction progress (in bytes)
  597. """
  598. hardlink_masters = hardlink_masters or {}
  599. has_damaged_chunks = 'chunks_healthy' in item
  600. if dry_run or stdout:
  601. if 'chunks' in item:
  602. item_chunks_size = 0
  603. for data in self.pipeline.fetch_many([c.id for c in item.chunks], is_preloaded=True):
  604. if pi:
  605. pi.show(increase=len(data), info=[remove_surrogates(item.path)])
  606. if stdout:
  607. sys.stdout.buffer.write(data)
  608. item_chunks_size += len(data)
  609. if stdout:
  610. sys.stdout.buffer.flush()
  611. if 'size' in item:
  612. item_size = item.size
  613. if item_size != item_chunks_size:
  614. raise BackupError('Size inconsistency detected: size {}, chunks size {}'.format(
  615. item_size, item_chunks_size))
  616. if has_damaged_chunks:
  617. raise BackupError('File has damaged (all-zero) chunks. Try running borg check --repair.')
  618. return
  619. original_path = original_path or item.path
  620. dest = self.cwd
  621. if item.path.startswith(('/', '../')):
  622. raise Exception('Path should be relative and local')
  623. path = os.path.join(dest, item.path)
  624. # Attempt to remove existing files, ignore errors on failure
  625. try:
  626. st = os.stat(path, follow_symlinks=False)
  627. if stat.S_ISDIR(st.st_mode):
  628. os.rmdir(path)
  629. else:
  630. os.unlink(path)
  631. except UnicodeEncodeError:
  632. raise self.IncompatibleFilesystemEncodingError(path, sys.getfilesystemencoding()) from None
  633. except OSError:
  634. pass
  635. def make_parent(path):
  636. parent_dir = os.path.dirname(path)
  637. if not os.path.exists(parent_dir):
  638. os.makedirs(parent_dir)
  639. mode = item.mode
  640. if stat.S_ISREG(mode):
  641. with backup_io('makedirs'):
  642. make_parent(path)
  643. with self.extract_helper(dest, item, path, stripped_components, original_path,
  644. hardlink_masters) as hardlink_set:
  645. if hardlink_set:
  646. return
  647. if sparse and self.zeros is None:
  648. self.zeros = b'\0' * max_chunk_size(*self.chunker_params)
  649. with backup_io('open'):
  650. fd = open(path, 'wb')
  651. with fd:
  652. ids = [c.id for c in item.chunks]
  653. for data in self.pipeline.fetch_many(ids, is_preloaded=True):
  654. if pi:
  655. pi.show(increase=len(data), info=[remove_surrogates(item.path)])
  656. with backup_io('write'):
  657. if sparse and self.zeros.startswith(data):
  658. # all-zero chunk: create a hole in a sparse file
  659. fd.seek(len(data), 1)
  660. else:
  661. fd.write(data)
  662. with backup_io('truncate_and_attrs'):
  663. pos = item_chunks_size = fd.tell()
  664. fd.truncate(pos)
  665. fd.flush()
  666. self.restore_attrs(path, item, fd=fd.fileno())
  667. if 'size' in item:
  668. item_size = item.size
  669. if item_size != item_chunks_size:
  670. raise BackupError('Size inconsistency detected: size {}, chunks size {}'.format(
  671. item_size, item_chunks_size))
  672. if has_damaged_chunks:
  673. raise BackupError('File has damaged (all-zero) chunks. Try running borg check --repair.')
  674. return
  675. with backup_io:
  676. # No repository access beyond this point.
  677. if stat.S_ISDIR(mode):
  678. make_parent(path)
  679. if not os.path.exists(path):
  680. os.mkdir(path)
  681. if restore_attrs:
  682. self.restore_attrs(path, item)
  683. elif stat.S_ISLNK(mode):
  684. make_parent(path)
  685. source = item.source
  686. try:
  687. os.symlink(source, path)
  688. except UnicodeEncodeError:
  689. raise self.IncompatibleFilesystemEncodingError(source, sys.getfilesystemencoding()) from None
  690. self.restore_attrs(path, item, symlink=True)
  691. elif stat.S_ISFIFO(mode):
  692. make_parent(path)
  693. with self.extract_helper(dest, item, path, stripped_components, original_path,
  694. hardlink_masters) as hardlink_set:
  695. if hardlink_set:
  696. return
  697. os.mkfifo(path)
  698. self.restore_attrs(path, item)
  699. elif stat.S_ISCHR(mode) or stat.S_ISBLK(mode):
  700. make_parent(path)
  701. with self.extract_helper(dest, item, path, stripped_components, original_path,
  702. hardlink_masters) as hardlink_set:
  703. if hardlink_set:
  704. return
  705. os.mknod(path, item.mode, item.rdev)
  706. self.restore_attrs(path, item)
  707. else:
  708. raise Exception('Unknown archive item type %r' % item.mode)
  709. def restore_attrs(self, path, item, symlink=False, fd=None):
  710. """
  711. Restore filesystem attributes on *path* (*fd*) from *item*.
  712. Does not access the repository.
  713. """
  714. backup_io.op = 'attrs'
  715. uid = gid = None
  716. if not self.numeric_owner:
  717. uid = user2uid(item.user)
  718. gid = group2gid(item.group)
  719. uid = item.uid if uid is None else uid
  720. gid = item.gid if gid is None else gid
  721. # This code is a bit of a mess due to os specific differences
  722. if not is_win32:
  723. try:
  724. if fd:
  725. os.fchown(fd, uid, gid)
  726. else:
  727. os.chown(path, uid, gid, follow_symlinks=False)
  728. except OSError:
  729. pass
  730. if fd:
  731. os.fchmod(fd, item.mode)
  732. elif not symlink:
  733. os.chmod(path, item.mode)
  734. elif has_lchmod: # Not available on Linux
  735. os.lchmod(path, item.mode)
  736. mtime = item.mtime
  737. if 'atime' in item:
  738. atime = item.atime
  739. else:
  740. # old archives only had mtime in item metadata
  741. atime = mtime
  742. if 'birthtime' in item:
  743. birthtime = item.birthtime
  744. try:
  745. # This should work on FreeBSD, NetBSD, and Darwin and be harmless on other platforms.
  746. # See utimes(2) on either of the BSDs for details.
  747. if fd:
  748. os.utime(fd, None, ns=(atime, birthtime))
  749. else:
  750. os.utime(path, None, ns=(atime, birthtime), follow_symlinks=False)
  751. except OSError:
  752. # some systems don't support calling utime on a symlink
  753. pass
  754. try:
  755. if fd:
  756. os.utime(fd, None, ns=(atime, mtime))
  757. else:
  758. os.utime(path, None, ns=(atime, mtime), follow_symlinks=False)
  759. except OSError:
  760. # some systems don't support calling utime on a symlink
  761. pass
  762. acl_set(path, item, self.numeric_owner, fd=fd)
  763. # chown removes Linux capabilities, so set the extended attributes at the end, after chown, since they include
  764. # the Linux capabilities in the "security.capability" attribute.
  765. warning = xattr.set_all(fd or path, item.get('xattrs', {}), follow_symlinks=False)
  766. if warning:
  767. set_ec(EXIT_WARNING)
  768. # bsdflags include the immutable flag and need to be set last:
  769. if not self.noflags and 'bsdflags' in item:
  770. try:
  771. set_flags(path, item.bsdflags, fd=fd)
  772. except OSError:
  773. pass
  774. def set_meta(self, key, value):
  775. metadata = self._load_meta(self.id)
  776. setattr(metadata, key, value)
  777. data = msgpack.packb(metadata.as_dict())
  778. new_id = self.key.id_hash(data)
  779. self.cache.add_chunk(new_id, data, self.stats)
  780. self.manifest.archives[self.name] = (new_id, metadata.time)
  781. self.cache.chunk_decref(self.id, self.stats)
  782. self.id = new_id
  783. def rename(self, name):
  784. if name in self.manifest.archives:
  785. raise self.AlreadyExists(name)
  786. oldname = self.name
  787. self.name = name
  788. self.set_meta('name', name)
  789. del self.manifest.archives[oldname]
  790. def delete(self, stats, progress=False, forced=False):
  791. class ChunksIndexError(Error):
  792. """Chunk ID {} missing from chunks index, corrupted chunks index - aborting transaction."""
  793. exception_ignored = object()
  794. def fetch_async_response(wait=True):
  795. try:
  796. return self.repository.async_response(wait=wait)
  797. except Repository.ObjectNotFound:
  798. nonlocal error
  799. # object not in repo - strange, but we wanted to delete it anyway.
  800. if forced == 0:
  801. raise
  802. error = True
  803. return exception_ignored # must not return None here
  804. def chunk_decref(id, stats, part=False):
  805. try:
  806. self.cache.chunk_decref(id, stats, wait=False, part=part)
  807. except KeyError:
  808. cid = bin_to_hex(id)
  809. raise ChunksIndexError(cid)
  810. else:
  811. fetch_async_response(wait=False)
  812. error = False
  813. try:
  814. unpacker = msgpack.Unpacker(use_list=False)
  815. items_ids = self.metadata.items
  816. pi = ProgressIndicatorPercent(total=len(items_ids), msg="Decrementing references %3.0f%%", msgid='archive.delete')
  817. for (i, (items_id, data)) in enumerate(zip(items_ids, self.repository.get_many(items_ids))):
  818. if progress:
  819. pi.show(i)
  820. data = self.key.decrypt(items_id, data)
  821. unpacker.feed(data)
  822. chunk_decref(items_id, stats)
  823. try:
  824. for item in unpacker:
  825. item = Item(internal_dict=item)
  826. if 'chunks' in item:
  827. part = not self.consider_part_files and 'part' in item
  828. for chunk_id, size, csize in item.chunks:
  829. chunk_decref(chunk_id, stats, part=part)
  830. except (TypeError, ValueError):
  831. # if items metadata spans multiple chunks and one chunk got dropped somehow,
  832. # it could be that unpacker yields bad types
  833. if forced == 0:
  834. raise
  835. error = True
  836. if progress:
  837. pi.finish()
  838. except (msgpack.UnpackException, Repository.ObjectNotFound):
  839. # items metadata corrupted
  840. if forced == 0:
  841. raise
  842. error = True
  843. # in forced delete mode, we try hard to delete at least the manifest entry,
  844. # if possible also the archive superblock, even if processing the items raises
  845. # some harmless exception.
  846. chunk_decref(self.id, stats)
  847. del self.manifest.archives[self.name]
  848. while fetch_async_response(wait=True) is not None:
  849. # we did async deletes, process outstanding results (== exceptions),
  850. # so there is nothing pending when we return and our caller wants to commit.
  851. pass
  852. if error:
  853. logger.warning('forced deletion succeeded, but the deleted archive was corrupted.')
  854. logger.warning('borg check --repair is required to free all space.')
  855. @staticmethod
  856. def compare_archives_iter(archive1, archive2, matcher=None, can_compare_chunk_ids=False):
  857. """
  858. Yields tuples with a path and an ItemDiff instance describing changes/indicating equality.
  859. :param matcher: PatternMatcher class to restrict results to only matching paths.
  860. :param can_compare_chunk_ids: Whether --chunker-params are the same for both archives.
  861. """
  862. def hardlink_master_seen(item):
  863. return 'source' not in item or not hardlinkable(item.mode) or item.source in hardlink_masters
  864. def is_hardlink_master(item):
  865. return item.get('hardlink_master', True) and 'source' not in item
  866. def update_hardlink_masters(item1, item2):
  867. if is_hardlink_master(item1) or is_hardlink_master(item2):
  868. hardlink_masters[item1.path] = (item1, item2)
  869. def has_hardlink_master(item, hardlink_masters):
  870. return hardlinkable(item.mode) and item.get('source') in hardlink_masters
  871. def compare_items(item1, item2):
  872. if has_hardlink_master(item1, hardlink_masters):
  873. item1 = hardlink_masters[item1.source][0]
  874. if has_hardlink_master(item2, hardlink_masters):
  875. item2 = hardlink_masters[item2.source][1]
  876. return ItemDiff(item1, item2,
  877. archive1.pipeline.fetch_many([c.id for c in item1.get('chunks', [])]),
  878. archive2.pipeline.fetch_many([c.id for c in item2.get('chunks', [])]),
  879. can_compare_chunk_ids=can_compare_chunk_ids)
  880. def defer_if_necessary(item1, item2):
  881. """Adds item tuple to deferred if necessary and returns True, if items were deferred"""
  882. update_hardlink_masters(item1, item2)
  883. defer = not hardlink_master_seen(item1) or not hardlink_master_seen(item2)
  884. if defer:
  885. deferred.append((item1, item2))
  886. return defer
  887. orphans_archive1 = OrderedDict()
  888. orphans_archive2 = OrderedDict()
  889. deferred = []
  890. hardlink_masters = {}
  891. for item1, item2 in zip_longest(
  892. archive1.iter_items(lambda item: matcher.match(item.path)),
  893. archive2.iter_items(lambda item: matcher.match(item.path)),
  894. ):
  895. if item1 and item2 and item1.path == item2.path:
  896. if not defer_if_necessary(item1, item2):
  897. yield (item1.path, compare_items(item1, item2))
  898. continue
  899. if item1:
  900. matching_orphan = orphans_archive2.pop(item1.path, None)
  901. if matching_orphan:
  902. if not defer_if_necessary(item1, matching_orphan):
  903. yield (item1.path, compare_items(item1, matching_orphan))
  904. else:
  905. orphans_archive1[item1.path] = item1
  906. if item2:
  907. matching_orphan = orphans_archive1.pop(item2.path, None)
  908. if matching_orphan:
  909. if not defer_if_necessary(matching_orphan, item2):
  910. yield (matching_orphan.path, compare_items(matching_orphan, item2))
  911. else:
  912. orphans_archive2[item2.path] = item2
  913. # At this point orphans_* contain items that had no matching partner in the other archive
  914. for added in orphans_archive2.values():
  915. path = added.path
  916. deleted_item = Item.create_deleted(path)
  917. update_hardlink_masters(deleted_item, added)
  918. yield (path, compare_items(deleted_item, added))
  919. for deleted in orphans_archive1.values():
  920. path = deleted.path
  921. deleted_item = Item.create_deleted(path)
  922. update_hardlink_masters(deleted, deleted_item)
  923. yield (path, compare_items(deleted, deleted_item))
  924. for item1, item2 in deferred:
  925. assert hardlink_master_seen(item1)
  926. assert hardlink_master_seen(item2)
  927. yield (path, compare_items(item1, item2))
  928. class MetadataCollector:
  929. def __init__(self, *, noatime, noctime, numeric_owner, noflags, nobirthtime):
  930. self.noatime = noatime
  931. self.noctime = noctime
  932. self.numeric_owner = numeric_owner
  933. self.noflags = noflags
  934. self.nobirthtime = nobirthtime
  935. def stat_simple_attrs(self, st):
  936. attrs = dict(
  937. mode=st.st_mode,
  938. uid=st.st_uid,
  939. gid=st.st_gid,
  940. mtime=safe_ns(st.st_mtime_ns),
  941. )
  942. # borg can work with archives only having mtime (older attic archives do not have
  943. # atime/ctime). it can be useful to omit atime/ctime, if they change without the
  944. # file content changing - e.g. to get better metadata deduplication.
  945. if not self.noatime:
  946. attrs['atime'] = safe_ns(st.st_atime_ns)
  947. if not self.noctime:
  948. attrs['ctime'] = safe_ns(st.st_ctime_ns)
  949. if not self.nobirthtime and hasattr(st, 'st_birthtime'):
  950. # sadly, there's no stat_result.st_birthtime_ns
  951. attrs['birthtime'] = safe_ns(int(st.st_birthtime * 10**9))
  952. if self.numeric_owner:
  953. attrs['user'] = attrs['group'] = None
  954. else:
  955. attrs['user'] = uid2user(st.st_uid)
  956. attrs['group'] = gid2group(st.st_gid)
  957. return attrs
  958. def stat_ext_attrs(self, st, path, fd=None):
  959. attrs = {}
  960. flags = 0
  961. with backup_io('extended stat'):
  962. if not self.noflags:
  963. flags = get_flags(path, st, fd=fd)
  964. xattrs = xattr.get_all(fd or path, follow_symlinks=False)
  965. acl_get(path, attrs, st, self.numeric_owner, fd=fd)
  966. if xattrs:
  967. attrs['xattrs'] = StableDict(xattrs)
  968. if flags:
  969. attrs['bsdflags'] = flags
  970. return attrs
  971. def stat_attrs(self, st, path, fd=None):
  972. attrs = self.stat_simple_attrs(st)
  973. attrs.update(self.stat_ext_attrs(st, path, fd=fd))
  974. return attrs
  975. class ChunksProcessor:
  976. # Processes an iterator of chunks for an Item
  977. def __init__(self, *, key, cache,
  978. add_item, write_checkpoint,
  979. checkpoint_interval, rechunkify):
  980. self.key = key
  981. self.cache = cache
  982. self.add_item = add_item
  983. self.write_checkpoint = write_checkpoint
  984. self.checkpoint_interval = checkpoint_interval
  985. self.last_checkpoint = time.monotonic()
  986. self.rechunkify = rechunkify
  987. self.zero_chunk_ids = LRUCache(10, dispose=lambda _: None) # length of all-zero chunk -> chunk_id
  988. self.zeros = memoryview(bytes(MAX_DATA_SIZE))
  989. def write_part_file(self, item, from_chunk, number):
  990. item = Item(internal_dict=item.as_dict())
  991. length = len(item.chunks)
  992. # the item should only have the *additional* chunks we processed after the last partial item:
  993. item.chunks = item.chunks[from_chunk:]
  994. # for borg recreate, we already have a size member in the source item (giving the total file size),
  995. # but we consider only a part of the file here, thus we must recompute the size from the chunks:
  996. item.get_size(memorize=True, from_chunks=True)
  997. item.path += '.borg_part_%d' % number
  998. item.part = number
  999. number += 1
  1000. self.add_item(item, show_progress=False)
  1001. self.write_checkpoint()
  1002. return length, number
  1003. def maybe_checkpoint(self, item, from_chunk, part_number, forced=False):
  1004. sig_int_triggered = sig_int and sig_int.action_triggered()
  1005. if forced or sig_int_triggered or \
  1006. self.checkpoint_interval and time.monotonic() - self.last_checkpoint > self.checkpoint_interval:
  1007. if sig_int_triggered:
  1008. logger.info('checkpoint requested: starting checkpoint creation...')
  1009. from_chunk, part_number = self.write_part_file(item, from_chunk, part_number)
  1010. self.last_checkpoint = time.monotonic()
  1011. if sig_int_triggered:
  1012. sig_int.action_completed()
  1013. logger.info('checkpoint requested: finished checkpoint creation!')
  1014. return from_chunk, part_number
  1015. def process_file_chunks(self, item, cache, stats, show_progress, chunk_iter, chunk_processor=None):
  1016. if not chunk_processor:
  1017. def chunk_processor(chunk):
  1018. allocation = chunk.meta['allocation']
  1019. if allocation == CH_DATA:
  1020. data = chunk.data
  1021. chunk_id = self.key.id_hash(data)
  1022. elif allocation == CH_HOLE:
  1023. size = chunk.meta['size']
  1024. data = self.zeros[:size]
  1025. try:
  1026. chunk_id = self.zero_chunk_ids[size]
  1027. except KeyError:
  1028. chunk_id = self.key.id_hash(data)
  1029. self.zero_chunk_ids[size] = chunk_id
  1030. else:
  1031. raise ValueError('unexpected allocation type')
  1032. chunk_entry = cache.add_chunk(chunk_id, data, stats, wait=False)
  1033. self.cache.repository.async_response(wait=False)
  1034. return chunk_entry
  1035. item.chunks = []
  1036. # if we rechunkify, we'll get a fundamentally different chunks list, thus we need
  1037. # to get rid of .chunks_healthy, as it might not correspond to .chunks any more.
  1038. if self.rechunkify and 'chunks_healthy' in item:
  1039. del item.chunks_healthy
  1040. from_chunk = 0
  1041. part_number = 1
  1042. for chunk in chunk_iter:
  1043. item.chunks.append(chunk_processor(chunk))
  1044. if show_progress:
  1045. stats.show_progress(item=item, dt=0.2)
  1046. from_chunk, part_number = self.maybe_checkpoint(item, from_chunk, part_number, forced=False)
  1047. else:
  1048. if part_number > 1:
  1049. if item.chunks[from_chunk:]:
  1050. # if we already have created a part item inside this file, we want to put the final
  1051. # chunks (if any) into a part item also (so all parts can be concatenated to get
  1052. # the complete file):
  1053. from_chunk, part_number = self.maybe_checkpoint(item, from_chunk, part_number, forced=True)
  1054. # if we created part files, we have referenced all chunks from the part files,
  1055. # but we also will reference the same chunks also from the final, complete file:
  1056. for chunk in item.chunks:
  1057. cache.chunk_incref(chunk.id, stats, size=chunk.size, part=True)
  1058. stats.nfiles_parts += part_number - 1
  1059. class FilesystemObjectProcessors:
  1060. # When ported to threading, then this doesn't need chunker, cache, key any more.
  1061. # write_checkpoint should then be in the item buffer,
  1062. # and process_file becomes a callback passed to __init__.
  1063. def __init__(self, *, metadata_collector, cache, key,
  1064. add_item, process_file_chunks,
  1065. chunker_params, show_progress, sparse):
  1066. self.metadata_collector = metadata_collector
  1067. self.cache = cache
  1068. self.key = key
  1069. self.add_item = add_item
  1070. self.process_file_chunks = process_file_chunks
  1071. self.show_progress = show_progress
  1072. self.hard_links = {}
  1073. self.stats = Statistics() # threading: done by cache (including progress)
  1074. self.cwd = os.getcwd()
  1075. self.chunker = get_chunker(*chunker_params, seed=key.chunk_seed, sparse=sparse)
  1076. @contextmanager
  1077. def create_helper(self, path, st, status=None, hardlinkable=True):
  1078. safe_path = make_path_safe(path)
  1079. item = Item(path=safe_path)
  1080. hardlink_master = False
  1081. hardlinked = hardlinkable and st.st_nlink > 1
  1082. if hardlinked:
  1083. source = self.hard_links.get((st.st_ino, st.st_dev))
  1084. if source is not None:
  1085. item.source = source
  1086. status = 'h' # hardlink (to already seen inodes)
  1087. else:
  1088. hardlink_master = True
  1089. yield item, status, hardlinked, hardlink_master
  1090. # if we get here, "with"-block worked ok without error/exception, the item was processed ok...
  1091. self.add_item(item, stats=self.stats)
  1092. # ... and added to the archive, so we can remember it to refer to it later in the archive:
  1093. if hardlink_master:
  1094. self.hard_links[(st.st_ino, st.st_dev)] = safe_path
  1095. def process_dir_with_fd(self, *, path, fd, st):
  1096. with self.create_helper(path, st, 'd', hardlinkable=False) as (item, status, hardlinked, hardlink_master):
  1097. item.update(self.metadata_collector.stat_attrs(st, path, fd=fd))
  1098. return status
  1099. def process_dir(self, *, path, parent_fd, name, st):
  1100. with self.create_helper(path, st, 'd', hardlinkable=False) as (item, status, hardlinked, hardlink_master):
  1101. with OsOpen(path=path, parent_fd=parent_fd, name=name, flags=flags_dir,
  1102. noatime=True, op='dir_open') as fd:
  1103. # fd is None for directories on windows, in that case a race condition check is not possible.
  1104. if fd is not None:
  1105. with backup_io('fstat'):
  1106. st = stat_update_check(st, os.fstat(fd))
  1107. item.update(self.metadata_collector.stat_attrs(st, path, fd=fd))
  1108. return status
  1109. def process_fifo(self, *, path, parent_fd, name, st):
  1110. with self.create_helper(path, st, 'f') as (item, status, hardlinked, hardlink_master): # fifo
  1111. with OsOpen(path=path, parent_fd=parent_fd, name=name, flags=flags_normal, noatime=True) as fd:
  1112. with backup_io('fstat'):
  1113. st = stat_update_check(st, os.fstat(fd))
  1114. item.update(self.metadata_collector.stat_attrs(st, path, fd=fd))
  1115. return status
  1116. def process_dev(self, *, path, parent_fd, name, st, dev_type):
  1117. with self.create_helper(path, st, dev_type) as (item, status, hardlinked, hardlink_master): # char/block device
  1118. # looks like we can not work fd-based here without causing issues when trying to open/close the device
  1119. with backup_io('stat'):
  1120. st = stat_update_check(st, os.stat(name, dir_fd=parent_fd, follow_symlinks=False))
  1121. item.rdev = st.st_rdev
  1122. item.update(self.metadata_collector.stat_attrs(st, path))
  1123. return status
  1124. def process_symlink(self, *, path, parent_fd, name, st):
  1125. # note: using hardlinkable=False because we can not support hardlinked symlinks,
  1126. # due to the dual-use of item.source, see issue #2343:
  1127. # hardlinked symlinks will be archived [and extracted] as non-hardlinked symlinks.
  1128. with self.create_helper(path, st, 's', hardlinkable=False) as (item, status, hardlinked, hardlink_master):
  1129. fname = name if name is not None and parent_fd is not None else path
  1130. with backup_io('readlink'):
  1131. source = os.readlink(fname, dir_fd=parent_fd)
  1132. item.source = source
  1133. item.update(self.metadata_collector.stat_attrs(st, path)) # can't use FD here?
  1134. return status
  1135. def process_pipe(self, *, path, cache, fd, mode, user, group):
  1136. uid = user2uid(user)
  1137. if uid is None:
  1138. raise Error("no such user: %s" % user)
  1139. gid = group2gid(group)
  1140. if gid is None:
  1141. raise Error("no such group: %s" % group)
  1142. t = int(time.time()) * 1000000000
  1143. item = Item(
  1144. path=path,
  1145. mode=mode & 0o107777 | 0o100000, # forcing regular file mode
  1146. uid=uid, user=user,
  1147. gid=gid, group=group,
  1148. mtime=t, atime=t, ctime=t,
  1149. )
  1150. self.process_file_chunks(item, cache, self.stats, self.show_progress, backup_io_iter(self.chunker.chunkify(fd)))
  1151. item.get_size(memorize=True)
  1152. self.stats.nfiles += 1
  1153. self.add_item(item, stats=self.stats)
  1154. return 'i' # stdin
  1155. def process_file(self, *, path, parent_fd, name, st, cache, flags=flags_normal):
  1156. with self.create_helper(path, st, None) as (item, status, hardlinked, hardlink_master): # no status yet
  1157. with OsOpen(path=path, parent_fd=parent_fd, name=name, flags=flags, noatime=True) as fd:
  1158. with backup_io('fstat'):
  1159. st = stat_update_check(st, os.fstat(fd))
  1160. item.update(self.metadata_collector.stat_simple_attrs(st))
  1161. is_special_file = is_special(st.st_mode)
  1162. if is_special_file:
  1163. # we process a special file like a regular file. reflect that in mode,
  1164. # so it can be extracted / accessed in FUSE mount like a regular file.
  1165. # this needs to be done early, so that part files also get the patched mode.
  1166. item.mode = stat.S_IFREG | stat.S_IMODE(item.mode)
  1167. if not hardlinked or hardlink_master:
  1168. if not is_special_file:
  1169. path_hash = self.key.id_hash(safe_encode(os.path.join(self.cwd, path)))
  1170. known, ids = cache.file_known_and_unchanged(path_hash, st)
  1171. else:
  1172. # in --read-special mode, we may be called for special files.
  1173. # there should be no information in the cache about special files processed in
  1174. # read-special mode, but we better play safe as this was wrong in the past:
  1175. path_hash = None
  1176. known, ids = False, None
  1177. chunks = None
  1178. if ids is not None:
  1179. # Make sure all ids are available
  1180. for id_ in ids:
  1181. if not cache.seen_chunk(id_):
  1182. status = 'M' # cache said it is unmodified, but we lost a chunk: process file like modified
  1183. break
  1184. else:
  1185. chunks = [cache.chunk_incref(id_, self.stats) for id_ in ids]
  1186. status = 'U' # regular file, unchanged
  1187. else:
  1188. status = 'M' if known else 'A' # regular file, modified or added
  1189. item.hardlink_master = hardlinked
  1190. # Only chunkify the file if needed
  1191. if chunks is not None:
  1192. item.chunks = chunks
  1193. else:
  1194. with backup_io('read'):
  1195. self.process_file_chunks(item, cache, self.stats, self.show_progress, backup_io_iter(self.chunker.chunkify(None, fd)))
  1196. if is_win32:
  1197. changed_while_backup = False # TODO
  1198. else:
  1199. with backup_io('fstat2'):
  1200. st2 = os.fstat(fd)
  1201. # special files:
  1202. # - fifos change naturally, because they are fed from the other side. no problem.
  1203. # - blk/chr devices don't change ctime anyway.
  1204. changed_while_backup = not is_special_file and st.st_ctime_ns != st2.st_ctime_ns
  1205. if changed_while_backup:
  1206. status = 'C' # regular file changed while we backed it up, might be inconsistent/corrupt!
  1207. if not is_special_file and not changed_while_backup:
  1208. # we must not memorize special files, because the contents of e.g. a
  1209. # block or char device will change without its mtime/size/inode changing.
  1210. # also, we must not memorize a potentially inconsistent/corrupt file that
  1211. # changed while we backed it up.
  1212. cache.memorize_file(path_hash, st, [c.id for c in item.chunks])
  1213. self.stats.nfiles += 1
  1214. item.update(self.metadata_collector.stat_ext_attrs(st, path, fd=fd))
  1215. item.get_size(memorize=True)
  1216. return status
  1217. def valid_msgpacked_dict(d, keys_serialized):
  1218. """check if the data <d> looks like a msgpacked dict"""
  1219. d_len = len(d)
  1220. if d_len == 0:
  1221. return False
  1222. if d[0] & 0xf0 == 0x80: # object is a fixmap (up to 15 elements)
  1223. offs = 1
  1224. elif d[0] == 0xde: # object is a map16 (up to 2^16-1 elements)
  1225. offs = 3
  1226. else:
  1227. # object is not a map (dict)
  1228. # note: we must not have dicts with > 2^16-1 elements
  1229. return False
  1230. if d_len <= offs:
  1231. return False
  1232. # is the first dict key a bytestring?
  1233. if d[offs] & 0xe0 == 0xa0: # key is a small bytestring (up to 31 chars)
  1234. pass
  1235. elif d[offs] in (0xd9, 0xda, 0xdb): # key is a str8, str16 or str32
  1236. pass
  1237. else:
  1238. # key is not a bytestring
  1239. return False
  1240. # is the bytestring any of the expected key names?
  1241. key_serialized = d[offs:]
  1242. return any(key_serialized.startswith(pattern) for pattern in keys_serialized)
  1243. class RobustUnpacker:
  1244. """A restartable/robust version of the streaming msgpack unpacker
  1245. """
  1246. def __init__(self, validator, item_keys):
  1247. super().__init__()
  1248. self.item_keys = [msgpack.packb(name.encode()) for name in item_keys]
  1249. self.validator = validator
  1250. self._buffered_data = []
  1251. self._resync = False
  1252. self._unpacker = msgpack.Unpacker(object_hook=StableDict)
  1253. def resync(self):
  1254. self._buffered_data = []
  1255. self._resync = True
  1256. def feed(self, data):
  1257. if self._resync:
  1258. self._buffered_data.append(data)
  1259. else:
  1260. self._unpacker.feed(data)
  1261. def __iter__(self):
  1262. return self
  1263. def __next__(self):
  1264. if self._resync:
  1265. data = b''.join(self._buffered_data)
  1266. while self._resync:
  1267. if not data:
  1268. raise StopIteration
  1269. # Abort early if the data does not look like a serialized item dict
  1270. if not valid_msgpacked_dict(data, self.item_keys):
  1271. data = data[1:]
  1272. continue
  1273. self._unpacker = msgpack.Unpacker(object_hook=StableDict)
  1274. self._unpacker.feed(data)
  1275. try:
  1276. item = next(self._unpacker)
  1277. except (msgpack.UnpackException, StopIteration):
  1278. # as long as we are resyncing, we also ignore StopIteration
  1279. pass
  1280. else:
  1281. if self.validator(item):
  1282. self._resync = False
  1283. return item
  1284. data = data[1:]
  1285. else:
  1286. return next(self._unpacker)
  1287. class ArchiveChecker:
  1288. def __init__(self):
  1289. self.error_found = False
  1290. self.possibly_superseded = set()
  1291. def check(self, repository, repair=False, archive=None, first=0, last=0, sort_by='', glob=None,
  1292. verify_data=False, save_space=False):
  1293. """Perform a set of checks on 'repository'
  1294. :param repair: enable repair mode, write updated or corrected data into repository
  1295. :param archive: only check this archive
  1296. :param first/last/sort_by: only check this number of first/last archives ordered by sort_by
  1297. :param glob: only check archives matching this glob
  1298. :param verify_data: integrity verification of data referenced by archives
  1299. :param save_space: Repository.commit(save_space)
  1300. """
  1301. logger.info('Starting archive consistency check...')
  1302. self.check_all = archive is None and not any((first, last, glob))
  1303. self.repair = repair
  1304. self.repository = repository
  1305. self.init_chunks()
  1306. if not self.chunks:
  1307. logger.error('Repository contains no apparent data at all, cannot continue check/repair.')
  1308. return False
  1309. self.key = self.identify_key(repository)
  1310. if verify_data:
  1311. self.verify_data()
  1312. if Manifest.MANIFEST_ID not in self.chunks:
  1313. logger.error("Repository manifest not found!")
  1314. self.error_found = True
  1315. self.manifest = self.rebuild_manifest()
  1316. else:
  1317. try:
  1318. self.manifest, _ = Manifest.load(repository, (Manifest.Operation.CHECK,), key=self.key)
  1319. except IntegrityErrorBase as exc:
  1320. logger.error('Repository manifest is corrupted: %s', exc)
  1321. self.error_found = True
  1322. del self.chunks[Manifest.MANIFEST_ID]
  1323. self.manifest = self.rebuild_manifest()
  1324. self.rebuild_refcounts(archive=archive, first=first, last=last, sort_by=sort_by, glob=glob)
  1325. self.orphan_chunks_check()
  1326. self.finish(save_space=save_space)
  1327. if self.error_found:
  1328. logger.error('Archive consistency check complete, problems found.')
  1329. else:
  1330. logger.info('Archive consistency check complete, no problems found.')
  1331. return self.repair or not self.error_found
  1332. def init_chunks(self):
  1333. """Fetch a list of all object keys from repository
  1334. """
  1335. # Explicitly set the initial usable hash table capacity to avoid performance issues
  1336. # due to hash table "resonance".
  1337. # Since reconstruction of archive items can add some new chunks, add 10 % headroom.
  1338. self.chunks = ChunkIndex(usable=len(self.repository) * 1.1)
  1339. marker = None
  1340. while True:
  1341. result = self.repository.list(limit=LIST_SCAN_LIMIT, marker=marker)
  1342. if not result:
  1343. break
  1344. marker = result[-1]
  1345. init_entry = ChunkIndexEntry(refcount=0, size=0, csize=0)
  1346. for id_ in result:
  1347. self.chunks[id_] = init_entry
  1348. def identify_key(self, repository):
  1349. try:
  1350. some_chunkid, _ = next(self.chunks.iteritems())
  1351. except StopIteration:
  1352. # repo is completely empty, no chunks
  1353. return None
  1354. cdata = repository.get(some_chunkid)
  1355. return key_factory(repository, cdata)
  1356. def verify_data(self):
  1357. logger.info('Starting cryptographic data integrity verification...')
  1358. chunks_count_index = len(self.chunks)
  1359. chunks_count_segments = 0
  1360. errors = 0
  1361. defect_chunks = []
  1362. pi = ProgressIndicatorPercent(total=chunks_count_index, msg="Verifying data %6.2f%%", step=0.01,
  1363. msgid='check.verify_data')
  1364. marker = None
  1365. while True:
  1366. chunk_ids = self.repository.scan(limit=100, marker=marker)
  1367. if not chunk_ids:
  1368. break
  1369. chunks_count_segments += len(chunk_ids)
  1370. marker = chunk_ids[-1]
  1371. chunk_data_iter = self.repository.get_many(chunk_ids)
  1372. chunk_ids_revd = list(reversed(chunk_ids))
  1373. while chunk_ids_revd:
  1374. pi.show()
  1375. chunk_id = chunk_ids_revd.pop(-1) # better efficiency
  1376. try:
  1377. encrypted_data = next(chunk_data_iter)
  1378. except (Repository.ObjectNotFound, IntegrityErrorBase) as err:
  1379. self.error_found = True
  1380. errors += 1
  1381. logger.error('chunk %s: %s', bin_to_hex(chunk_id), err)
  1382. if isinstance(err, IntegrityErrorBase):
  1383. defect_chunks.append(chunk_id)
  1384. # as the exception killed our generator, make a new one for remaining chunks:
  1385. if chunk_ids_revd:
  1386. chunk_ids = list(reversed(chunk_ids_revd))
  1387. chunk_data_iter = self.repository.get_many(chunk_ids)
  1388. else:
  1389. _chunk_id = None if chunk_id == Manifest.MANIFEST_ID else chunk_id
  1390. try:
  1391. self.key.decrypt(_chunk_id, encrypted_data)
  1392. except IntegrityErrorBase as integrity_error:
  1393. self.error_found = True
  1394. errors += 1
  1395. logger.error('chunk %s, integrity error: %s', bin_to_hex(chunk_id), integrity_error)
  1396. defect_chunks.append(chunk_id)
  1397. pi.finish()
  1398. if chunks_count_index != chunks_count_segments:
  1399. logger.error('Repo/Chunks index object count vs. segment files object count mismatch.')
  1400. logger.error('Repo/Chunks index: %d objects != segment files: %d objects',
  1401. chunks_count_index, chunks_count_segments)
  1402. if defect_chunks:
  1403. if self.repair:
  1404. # if we kill the defect chunk here, subsequent actions within this "borg check"
  1405. # run will find missing chunks and replace them with all-zero replacement
  1406. # chunks and flag the files as "repaired".
  1407. # if another backup is done later and the missing chunks get backupped again,
  1408. # a "borg check" afterwards can heal all files where this chunk was missing.
  1409. logger.warning('Found defect chunks. They will be deleted now, so affected files can '
  1410. 'get repaired now and maybe healed later.')
  1411. for defect_chunk in defect_chunks:
  1412. # remote repo (ssh): retry might help for strange network / NIC / RAM errors
  1413. # as the chunk will be retransmitted from remote server.
  1414. # local repo (fs): as chunks.iteritems loop usually pumps a lot of data through,
  1415. # a defect chunk is likely not in the fs cache any more and really gets re-read
  1416. # from the underlying media.
  1417. try:
  1418. encrypted_data = self.repository.get(defect_chunk)
  1419. _chunk_id = None if defect_chunk == Manifest.MANIFEST_ID else defect_chunk
  1420. self.key.decrypt(_chunk_id, encrypted_data)
  1421. except IntegrityErrorBase:
  1422. # failed twice -> get rid of this chunk
  1423. del self.chunks[defect_chunk]
  1424. self.repository.delete(defect_chunk)
  1425. logger.debug('chunk %s deleted.', bin_to_hex(defect_chunk))
  1426. else:
  1427. logger.warning('chunk %s not deleted, did not consistently fail.')
  1428. else:
  1429. logger.warning('Found defect chunks. With --repair, they would get deleted, so affected '
  1430. 'files could get repaired then and maybe healed later.')
  1431. for defect_chunk in defect_chunks:
  1432. logger.debug('chunk %s is defect.', bin_to_hex(defect_chunk))
  1433. log = logger.error if errors else logger.info
  1434. log('Finished cryptographic data integrity verification, verified %d chunks with %d integrity errors.',
  1435. chunks_count_segments, errors)
  1436. def rebuild_manifest(self):
  1437. """Rebuild the manifest object if it is missing
  1438. Iterates through all objects in the repository looking for archive metadata blocks.
  1439. """
  1440. required_archive_keys = frozenset(key.encode() for key in REQUIRED_ARCHIVE_KEYS)
  1441. def valid_archive(obj):
  1442. if not isinstance(obj, dict):
  1443. return False
  1444. keys = set(obj)
  1445. return required_archive_keys.issubset(keys)
  1446. logger.info('Rebuilding missing manifest, this might take some time...')
  1447. # as we have lost the manifest, we do not know any more what valid item keys we had.
  1448. # collecting any key we encounter in a damaged repo seems unwise, thus we just use
  1449. # the hardcoded list from the source code. thus, it is not recommended to rebuild a
  1450. # lost manifest on a older borg version than the most recent one that was ever used
  1451. # within this repository (assuming that newer borg versions support more item keys).
  1452. manifest = Manifest(self.key, self.repository)
  1453. archive_keys_serialized = [msgpack.packb(name.encode()) for name in ARCHIVE_KEYS]
  1454. pi = ProgressIndicatorPercent(total=len(self.chunks), msg="Rebuilding manifest %6.2f%%", step=0.01,
  1455. msgid='check.rebuild_manifest')
  1456. for chunk_id, _ in self.chunks.iteritems():
  1457. pi.show()
  1458. cdata = self.repository.get(chunk_id)
  1459. try:
  1460. data = self.key.decrypt(chunk_id, cdata)
  1461. except IntegrityErrorBase as exc:
  1462. logger.error('Skipping corrupted chunk: %s', exc)
  1463. self.error_found = True
  1464. continue
  1465. if not valid_msgpacked_dict(data, archive_keys_serialized):
  1466. continue
  1467. if b'cmdline' not in data or b'\xa7version\x01' not in data:
  1468. continue
  1469. try:
  1470. archive = msgpack.unpackb(data)
  1471. # Ignore exceptions that might be raised when feeding msgpack with invalid data
  1472. except msgpack.UnpackException:
  1473. continue
  1474. if valid_archive(archive):
  1475. archive = ArchiveItem(internal_dict=archive)
  1476. name = archive.name
  1477. logger.info('Found archive %s', name)
  1478. if name in manifest.archives:
  1479. i = 1
  1480. while True:
  1481. new_name = '%s.%d' % (name, i)
  1482. if new_name not in manifest.archives:
  1483. break
  1484. i += 1
  1485. logger.warning('Duplicate archive name %s, storing as %s', name, new_name)
  1486. name = new_name
  1487. manifest.archives[name] = (chunk_id, archive.time)
  1488. pi.finish()
  1489. logger.info('Manifest rebuild complete.')
  1490. return manifest
  1491. def rebuild_refcounts(self, archive=None, first=0, last=0, sort_by='', glob=None):
  1492. """Rebuild object reference counts by walking the metadata
  1493. Missing and/or incorrect data is repaired when detected
  1494. """
  1495. # Exclude the manifest from chunks (manifest entry might be already deleted from self.chunks)
  1496. self.chunks.pop(Manifest.MANIFEST_ID, None)
  1497. def mark_as_possibly_superseded(id_):
  1498. if self.chunks.get(id_, ChunkIndexEntry(0, 0, 0)).refcount == 0:
  1499. self.possibly_superseded.add(id_)
  1500. def add_callback(chunk):
  1501. id_ = self.key.id_hash(chunk)
  1502. cdata = self.key.encrypt(chunk)
  1503. add_reference(id_, len(chunk), len(cdata), cdata)
  1504. return id_
  1505. def add_reference(id_, size, csize, cdata=None):
  1506. try:
  1507. self.chunks.incref(id_)
  1508. except KeyError:
  1509. assert cdata is not None
  1510. self.chunks[id_] = ChunkIndexEntry(refcount=1, size=size, csize=csize)
  1511. if self.repair:
  1512. self.repository.put(id_, cdata)
  1513. def verify_file_chunks(archive_name, item):
  1514. """Verifies that all file chunks are present.
  1515. Missing file chunks will be replaced with new chunks of the same length containing all zeros.
  1516. If a previously missing file chunk re-appears, the replacement chunk is replaced by the correct one.
  1517. """
  1518. def replacement_chunk(size):
  1519. data = bytes(size)
  1520. chunk_id = self.key.id_hash(data)
  1521. cdata = self.key.encrypt(data)
  1522. csize = len(cdata)
  1523. return chunk_id, size, csize, cdata
  1524. offset = 0
  1525. chunk_list = []
  1526. chunks_replaced = False
  1527. has_chunks_healthy = 'chunks_healthy' in item
  1528. chunks_current = item.chunks
  1529. chunks_healthy = item.chunks_healthy if has_chunks_healthy else chunks_current
  1530. if has_chunks_healthy and len(chunks_current) != len(chunks_healthy):
  1531. # should never happen, but there was issue #3218.
  1532. logger.warning('{}: {}: Invalid chunks_healthy metadata removed!'.format(archive_name, item.path))
  1533. del item.chunks_healthy
  1534. has_chunks_healthy = False
  1535. chunks_healthy = chunks_current
  1536. for chunk_current, chunk_healthy in zip(chunks_current, chunks_healthy):
  1537. chunk_id, size, csize = chunk_healthy
  1538. if chunk_id not in self.chunks:
  1539. # a chunk of the healthy list is missing
  1540. if chunk_current == chunk_healthy:
  1541. logger.error('{}: {}: New missing file chunk detected (Byte {}-{}). '
  1542. 'Replacing with all-zero chunk.'.format(
  1543. archive_name, item.path, offset, offset + size))
  1544. self.error_found = chunks_replaced = True
  1545. chunk_id, size, csize, cdata = replacement_chunk(size)
  1546. add_reference(chunk_id, size, csize, cdata)
  1547. else:
  1548. logger.info('{}: {}: Previously missing file chunk is still missing (Byte {}-{}). It has a '
  1549. 'all-zero replacement chunk already.'.format(
  1550. archive_name, item.path, offset, offset + size))
  1551. chunk_id, size, csize = chunk_current
  1552. if chunk_id in self.chunks:
  1553. add_reference(chunk_id, size, csize)
  1554. else:
  1555. logger.warning('{}: {}: Missing all-zero replacement chunk detected (Byte {}-{}). '
  1556. 'Generating new replacement chunk.'.format(
  1557. archive_name, item.path, offset, offset + size))
  1558. self.error_found = chunks_replaced = True
  1559. chunk_id, size, csize, cdata = replacement_chunk(size)
  1560. add_reference(chunk_id, size, csize, cdata)
  1561. else:
  1562. if chunk_current == chunk_healthy:
  1563. # normal case, all fine.
  1564. add_reference(chunk_id, size, csize)
  1565. else:
  1566. logger.info('{}: {}: Healed previously missing file chunk! '
  1567. '(Byte {}-{}).'.format(archive_name, item.path, offset, offset + size))
  1568. add_reference(chunk_id, size, csize)
  1569. mark_as_possibly_superseded(chunk_current[0]) # maybe orphaned the all-zero replacement chunk
  1570. chunk_list.append([chunk_id, size, csize]) # list-typed element as chunks_healthy is list-of-lists
  1571. offset += size
  1572. if chunks_replaced and not has_chunks_healthy:
  1573. # if this is first repair, remember the correct chunk IDs, so we can maybe heal the file later
  1574. item.chunks_healthy = item.chunks
  1575. if has_chunks_healthy and chunk_list == chunks_healthy:
  1576. logger.info('{}: {}: Completely healed previously damaged file!'.format(archive_name, item.path))
  1577. del item.chunks_healthy
  1578. item.chunks = chunk_list
  1579. if 'size' in item:
  1580. item_size = item.size
  1581. item_chunks_size = item.get_size(compressed=False, from_chunks=True)
  1582. if item_size != item_chunks_size:
  1583. # just warn, but keep the inconsistency, so that borg extract can warn about it.
  1584. logger.warning('{}: {}: size inconsistency detected: size {}, chunks size {}'.format(
  1585. archive_name, item.path, item_size, item_chunks_size))
  1586. def robust_iterator(archive):
  1587. """Iterates through all archive items
  1588. Missing item chunks will be skipped and the msgpack stream will be restarted
  1589. """
  1590. item_keys = frozenset(key.encode() for key in self.manifest.item_keys)
  1591. required_item_keys = frozenset(key.encode() for key in REQUIRED_ITEM_KEYS)
  1592. unpacker = RobustUnpacker(lambda item: isinstance(item, StableDict) and b'path' in item,
  1593. self.manifest.item_keys)
  1594. _state = 0
  1595. def missing_chunk_detector(chunk_id):
  1596. nonlocal _state
  1597. if _state % 2 != int(chunk_id not in self.chunks):
  1598. _state += 1
  1599. return _state
  1600. def report(msg, chunk_id, chunk_no):
  1601. cid = bin_to_hex(chunk_id)
  1602. msg += ' [chunk: %06d_%s]' % (chunk_no, cid) # see "debug dump-archive-items"
  1603. self.error_found = True
  1604. logger.error(msg)
  1605. def list_keys_safe(keys):
  1606. return ', '.join((k.decode(errors='replace') if isinstance(k, bytes) else str(k) for k in keys))
  1607. def valid_item(obj):
  1608. if not isinstance(obj, StableDict):
  1609. return False, 'not a dictionary'
  1610. # A bug in Attic up to and including release 0.13 added a (meaningless) b'acl' key to every item.
  1611. # We ignore it here, should it exist. See test_attic013_acl_bug for details.
  1612. obj.pop(b'acl', None)
  1613. keys = set(obj)
  1614. if not required_item_keys.issubset(keys):
  1615. return False, 'missing required keys: ' + list_keys_safe(required_item_keys - keys)
  1616. if not keys.issubset(item_keys):
  1617. return False, 'invalid keys: ' + list_keys_safe(keys - item_keys)
  1618. return True, ''
  1619. i = 0
  1620. for state, items in groupby(archive.items, missing_chunk_detector):
  1621. items = list(items)
  1622. if state % 2:
  1623. for chunk_id in items:
  1624. report('item metadata chunk missing', chunk_id, i)
  1625. i += 1
  1626. continue
  1627. if state > 0:
  1628. unpacker.resync()
  1629. for chunk_id, cdata in zip(items, repository.get_many(items)):
  1630. data = self.key.decrypt(chunk_id, cdata)
  1631. unpacker.feed(data)
  1632. try:
  1633. for item in unpacker:
  1634. valid, reason = valid_item(item)
  1635. if valid:
  1636. yield Item(internal_dict=item)
  1637. else:
  1638. report('Did not get expected metadata dict when unpacking item metadata (%s)' % reason, chunk_id, i)
  1639. except msgpack.UnpackException:
  1640. report('Unpacker crashed while unpacking item metadata, trying to resync...', chunk_id, i)
  1641. unpacker.resync()
  1642. except Exception:
  1643. report('Exception while unpacking item metadata', chunk_id, i)
  1644. raise
  1645. i += 1
  1646. if archive is None:
  1647. sort_by = sort_by.split(',')
  1648. if any((first, last, glob)):
  1649. archive_infos = self.manifest.archives.list(sort_by=sort_by, glob=glob, first=first, last=last)
  1650. if glob and not archive_infos:
  1651. logger.warning('--glob-archives %s does not match any archives', glob)
  1652. if first and len(archive_infos) < first:
  1653. logger.warning('--first %d archives: only found %d archives', first, len(archive_infos))
  1654. if last and len(archive_infos) < last:
  1655. logger.warning('--last %d archives: only found %d archives', last, len(archive_infos))
  1656. else:
  1657. archive_infos = self.manifest.archives.list(sort_by=sort_by)
  1658. else:
  1659. # we only want one specific archive
  1660. try:
  1661. archive_infos = [self.manifest.archives[archive]]
  1662. except KeyError:
  1663. logger.error("Archive '%s' not found.", archive)
  1664. self.error_found = True
  1665. return
  1666. num_archives = len(archive_infos)
  1667. with cache_if_remote(self.repository) as repository:
  1668. for i, info in enumerate(archive_infos):
  1669. logger.info('Analyzing archive {} ({}/{})'.format(info.name, i + 1, num_archives))
  1670. archive_id = info.id
  1671. if archive_id not in self.chunks:
  1672. logger.error('Archive metadata block is missing!')
  1673. self.error_found = True
  1674. del self.manifest.archives[info.name]
  1675. continue
  1676. mark_as_possibly_superseded(archive_id)
  1677. cdata = self.repository.get(archive_id)
  1678. data = self.key.decrypt(archive_id, cdata)
  1679. archive = ArchiveItem(internal_dict=msgpack.unpackb(data))
  1680. if archive.version != 1:
  1681. raise Exception('Unknown archive metadata version')
  1682. archive.cmdline = [safe_decode(arg) for arg in archive.cmdline]
  1683. items_buffer = ChunkBuffer(self.key)
  1684. items_buffer.write_chunk = add_callback
  1685. for item in robust_iterator(archive):
  1686. if 'chunks' in item:
  1687. verify_file_chunks(info.name, item)
  1688. items_buffer.add(item)
  1689. items_buffer.flush(flush=True)
  1690. for previous_item_id in archive.items:
  1691. mark_as_possibly_superseded(previous_item_id)
  1692. archive.items = items_buffer.chunks
  1693. data = msgpack.packb(archive.as_dict())
  1694. new_archive_id = self.key.id_hash(data)
  1695. cdata = self.key.encrypt(data)
  1696. add_reference(new_archive_id, len(data), len(cdata), cdata)
  1697. self.manifest.archives[info.name] = (new_archive_id, info.ts)
  1698. def orphan_chunks_check(self):
  1699. if self.check_all:
  1700. unused = {id_ for id_, entry in self.chunks.iteritems() if entry.refcount == 0}
  1701. orphaned = unused - self.possibly_superseded
  1702. if orphaned:
  1703. logger.error('{} orphaned objects found!'.format(len(orphaned)))
  1704. self.error_found = True
  1705. if self.repair and unused:
  1706. logger.info('Deleting %d orphaned and %d superseded objects...' % (
  1707. len(orphaned), len(self.possibly_superseded)))
  1708. for id_ in unused:
  1709. self.repository.delete(id_)
  1710. logger.info('Finished deleting orphaned/superseded objects.')
  1711. else:
  1712. logger.info('Orphaned objects check skipped (needs all archives checked).')
  1713. def finish(self, save_space=False):
  1714. if self.repair:
  1715. logger.info('Writing Manifest.')
  1716. self.manifest.write()
  1717. logger.info('Committing repo.')
  1718. self.repository.commit(compact=False, save_space=save_space)
  1719. class ArchiveRecreater:
  1720. class Interrupted(Exception):
  1721. def __init__(self, metadata=None):
  1722. self.metadata = metadata or {}
  1723. @staticmethod
  1724. def is_temporary_archive(archive_name):
  1725. return archive_name.endswith('.recreate')
  1726. def __init__(self, repository, manifest, key, cache, matcher,
  1727. exclude_caches=False, exclude_if_present=None, keep_exclude_tags=False,
  1728. chunker_params=None, compression=None, recompress=False, always_recompress=False,
  1729. dry_run=False, stats=False, progress=False, file_status_printer=None,
  1730. timestamp=None, checkpoint_interval=1800):
  1731. self.repository = repository
  1732. self.key = key
  1733. self.manifest = manifest
  1734. self.cache = cache
  1735. self.matcher = matcher
  1736. self.exclude_caches = exclude_caches
  1737. self.exclude_if_present = exclude_if_present or []
  1738. self.keep_exclude_tags = keep_exclude_tags
  1739. self.rechunkify = chunker_params is not None
  1740. if self.rechunkify:
  1741. logger.debug('Rechunking archives to %s', chunker_params)
  1742. self.chunker_params = chunker_params or CHUNKER_PARAMS
  1743. self.recompress = recompress
  1744. self.always_recompress = always_recompress
  1745. self.compression = compression or CompressionSpec('none')
  1746. self.seen_chunks = set()
  1747. self.timestamp = timestamp
  1748. self.dry_run = dry_run
  1749. self.stats = stats
  1750. self.progress = progress
  1751. self.print_file_status = file_status_printer or (lambda *args: None)
  1752. self.checkpoint_interval = None if dry_run else checkpoint_interval
  1753. def recreate(self, archive_name, comment=None, target_name=None):
  1754. assert not self.is_temporary_archive(archive_name)
  1755. archive = self.open_archive(archive_name)
  1756. target = self.create_target(archive, target_name)
  1757. if self.exclude_if_present or self.exclude_caches:
  1758. self.matcher_add_tagged_dirs(archive)
  1759. if self.matcher.empty() and not self.recompress and not target.recreate_rechunkify and comment is None:
  1760. return False
  1761. self.process_items(archive, target)
  1762. replace_original = target_name is None
  1763. self.save(archive, target, comment, replace_original=replace_original)
  1764. return True
  1765. def process_items(self, archive, target):
  1766. matcher = self.matcher
  1767. target_is_subset = not matcher.empty()
  1768. hardlink_masters = {} if target_is_subset else None
  1769. def item_is_hardlink_master(item):
  1770. return (target_is_subset and
  1771. hardlinkable(item.mode) and
  1772. item.get('hardlink_master', True) and
  1773. 'source' not in item)
  1774. for item in archive.iter_items():
  1775. if not matcher.match(item.path):
  1776. self.print_file_status('x', item.path)
  1777. if item_is_hardlink_master(item):
  1778. hardlink_masters[item.path] = (item.get('chunks'), item.get('chunks_healthy'), None)
  1779. continue
  1780. if target_is_subset and hardlinkable(item.mode) and item.get('source') in hardlink_masters:
  1781. # master of this hard link is outside the target subset
  1782. chunks, chunks_healthy, new_source = hardlink_masters[item.source]
  1783. if new_source is None:
  1784. # First item to use this master, move the chunks
  1785. item.chunks = chunks
  1786. if chunks_healthy is not None:
  1787. item.chunks_healthy = chunks_healthy
  1788. hardlink_masters[item.source] = (None, None, item.path)
  1789. del item.source
  1790. else:
  1791. # Master was already moved, only update this item's source
  1792. item.source = new_source
  1793. if self.dry_run:
  1794. self.print_file_status('-', item.path)
  1795. else:
  1796. self.process_item(archive, target, item)
  1797. if self.progress:
  1798. target.stats.show_progress(final=True)
  1799. def process_item(self, archive, target, item):
  1800. if 'chunks' in item:
  1801. self.process_chunks(archive, target, item)
  1802. target.stats.nfiles += 1
  1803. target.add_item(item, stats=target.stats)
  1804. self.print_file_status(file_status(item.mode), item.path)
  1805. def process_chunks(self, archive, target, item):
  1806. if not self.recompress and not target.recreate_rechunkify:
  1807. for chunk_id, size, csize in item.chunks:
  1808. self.cache.chunk_incref(chunk_id, target.stats)
  1809. return item.chunks
  1810. chunk_iterator = self.iter_chunks(archive, target, list(item.chunks))
  1811. chunk_processor = partial(self.chunk_processor, target)
  1812. target.process_file_chunks(item, self.cache, target.stats, self.progress, chunk_iterator, chunk_processor)
  1813. def chunk_processor(self, target, chunk):
  1814. # as this is recreate (we do not read from the fs), we never have holes here
  1815. assert chunk.meta['allocation'] == CH_DATA
  1816. data = chunk.data
  1817. chunk_id = self.key.id_hash(data)
  1818. if chunk_id in self.seen_chunks:
  1819. return self.cache.chunk_incref(chunk_id, target.stats)
  1820. overwrite = self.recompress
  1821. if self.recompress and not self.always_recompress and chunk_id in self.cache.chunks:
  1822. # Check if this chunk is already compressed the way we want it
  1823. old_chunk = self.key.decrypt(None, self.repository.get(chunk_id), decompress=False)
  1824. if Compressor.detect(old_chunk).name == self.key.compressor.decide(data).name:
  1825. # Stored chunk has the same compression we wanted
  1826. overwrite = False
  1827. chunk_entry = self.cache.add_chunk(chunk_id, data, target.stats, overwrite=overwrite, wait=False)
  1828. self.cache.repository.async_response(wait=False)
  1829. self.seen_chunks.add(chunk_entry.id)
  1830. return chunk_entry
  1831. def iter_chunks(self, archive, target, chunks):
  1832. chunk_iterator = archive.pipeline.fetch_many([chunk_id for chunk_id, _, _ in chunks])
  1833. if target.recreate_rechunkify:
  1834. # The target.chunker will read the file contents through ChunkIteratorFileWrapper chunk-by-chunk
  1835. # (does not load the entire file into memory)
  1836. file = ChunkIteratorFileWrapper(chunk_iterator)
  1837. yield from target.chunker.chunkify(file)
  1838. else:
  1839. for chunk in chunk_iterator:
  1840. yield Chunk(chunk, size=len(chunk), allocation=CH_DATA)
  1841. def save(self, archive, target, comment=None, replace_original=True):
  1842. if self.dry_run:
  1843. return
  1844. if comment is None:
  1845. comment = archive.metadata.get('comment', '')
  1846. # Keep for the statistics if necessary
  1847. if self.stats:
  1848. _start = target.start
  1849. if self.timestamp is None:
  1850. additional_metadata = {
  1851. 'time': archive.metadata.time,
  1852. 'time_end': archive.metadata.get('time_end') or archive.metadata.time,
  1853. 'cmdline': archive.metadata.cmdline,
  1854. # but also remember recreate metadata:
  1855. 'recreate_cmdline': sys.argv,
  1856. }
  1857. else:
  1858. additional_metadata = {
  1859. 'cmdline': archive.metadata.cmdline,
  1860. # but also remember recreate metadata:
  1861. 'recreate_cmdline': sys.argv,
  1862. }
  1863. target.save(comment=comment, timestamp=self.timestamp,
  1864. stats=target.stats, additional_metadata=additional_metadata)
  1865. if replace_original:
  1866. archive.delete(Statistics(), progress=self.progress)
  1867. target.rename(archive.name)
  1868. if self.stats:
  1869. target.start = _start
  1870. target.end = datetime.utcnow()
  1871. log_multi(DASHES,
  1872. str(target),
  1873. DASHES,
  1874. str(target.stats),
  1875. str(self.cache),
  1876. DASHES)
  1877. def matcher_add_tagged_dirs(self, archive):
  1878. """Add excludes to the matcher created by exclude_cache and exclude_if_present."""
  1879. def exclude(dir, tag_item):
  1880. if self.keep_exclude_tags:
  1881. tag_files.append(PathPrefixPattern(tag_item.path, recurse_dir=False))
  1882. tagged_dirs.append(FnmatchPattern(dir + '/', recurse_dir=False))
  1883. else:
  1884. tagged_dirs.append(PathPrefixPattern(dir, recurse_dir=False))
  1885. matcher = self.matcher
  1886. tag_files = []
  1887. tagged_dirs = []
  1888. # to support reading hard-linked CACHEDIR.TAGs (aka CACHE_TAG_NAME), similar to hardlink_masters:
  1889. cachedir_masters = {}
  1890. if self.exclude_caches:
  1891. # sadly, due to how CACHEDIR.TAG works (filename AND file [header] contents) and
  1892. # how borg deals with hardlinks (slave hardlinks referring back to master hardlinks),
  1893. # we need to pass over the archive collecting hardlink master paths.
  1894. # as seen in issue #4911, the master paths can have an arbitrary filenames,
  1895. # not just CACHEDIR.TAG.
  1896. for item in archive.iter_items(filter=lambda item: os.path.basename(item.path) == CACHE_TAG_NAME):
  1897. if stat.S_ISREG(item.mode) and 'chunks' not in item and 'source' in item:
  1898. # this is a hardlink slave, referring back to its hardlink master (via item.source)
  1899. cachedir_masters[item.source] = None # we know the key (path), but not the value (item) yet
  1900. for item in archive.iter_items(
  1901. filter=lambda item: os.path.basename(item.path) == CACHE_TAG_NAME or matcher.match(item.path)):
  1902. if self.exclude_caches and item.path in cachedir_masters:
  1903. cachedir_masters[item.path] = item
  1904. dir, tag_file = os.path.split(item.path)
  1905. if tag_file in self.exclude_if_present:
  1906. exclude(dir, item)
  1907. elif self.exclude_caches and tag_file == CACHE_TAG_NAME and stat.S_ISREG(item.mode):
  1908. content_item = item if 'chunks' in item else cachedir_masters[item.source]
  1909. file = open_item(archive, content_item)
  1910. if file.read(len(CACHE_TAG_CONTENTS)) == CACHE_TAG_CONTENTS:
  1911. exclude(dir, item)
  1912. matcher.add(tag_files, IECommand.Include)
  1913. matcher.add(tagged_dirs, IECommand.ExcludeNoRecurse)
  1914. def create_target(self, archive, target_name=None):
  1915. """Create target archive."""
  1916. target_name = target_name or archive.name + '.recreate'
  1917. target = self.create_target_archive(target_name)
  1918. # If the archives use the same chunker params, then don't rechunkify
  1919. source_chunker_params = tuple(archive.metadata.get('chunker_params', []))
  1920. if len(source_chunker_params) == 4 and isinstance(source_chunker_params[0], int):
  1921. # this is a borg < 1.2 chunker_params tuple, no chunker algo specified, but we only had buzhash:
  1922. source_chunker_params = (CH_BUZHASH, ) + source_chunker_params
  1923. target.recreate_rechunkify = self.rechunkify and source_chunker_params != target.chunker_params
  1924. if target.recreate_rechunkify:
  1925. logger.debug('Rechunking archive from %s to %s', source_chunker_params or '(unknown)', target.chunker_params)
  1926. target.process_file_chunks = ChunksProcessor(
  1927. cache=self.cache, key=self.key,
  1928. add_item=target.add_item, write_checkpoint=target.write_checkpoint,
  1929. checkpoint_interval=self.checkpoint_interval, rechunkify=target.recreate_rechunkify).process_file_chunks
  1930. target.chunker = get_chunker(*target.chunker_params, seed=self.key.chunk_seed)
  1931. return target
  1932. def create_target_archive(self, name):
  1933. target = Archive(self.repository, self.key, self.manifest, name, create=True,
  1934. progress=self.progress, chunker_params=self.chunker_params, cache=self.cache,
  1935. checkpoint_interval=self.checkpoint_interval)
  1936. return target
  1937. def open_archive(self, name, **kwargs):
  1938. return Archive(self.repository, self.key, self.manifest, name, cache=self.cache, **kwargs)