archive.py 77 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768
  1. import errno
  2. import json
  3. import os
  4. import socket
  5. import stat
  6. import sys
  7. import time
  8. from contextlib import contextmanager
  9. from datetime import datetime, timezone, timedelta
  10. from functools import partial
  11. from getpass import getuser
  12. from io import BytesIO
  13. from itertools import groupby
  14. from shutil import get_terminal_size
  15. import msgpack
  16. from .logger import create_logger
  17. logger = create_logger()
  18. from . import xattr
  19. from .chunker import Chunker
  20. from .cache import ChunkListEntry
  21. from .crypto.key import key_factory
  22. from .compress import Compressor, CompressionSpec
  23. from .constants import * # NOQA
  24. from .hashindex import ChunkIndex, ChunkIndexEntry, CacheSynchronizer
  25. from .helpers import Manifest
  26. from .helpers import hardlinkable
  27. from .helpers import ChunkIteratorFileWrapper, open_item
  28. from .helpers import Error, IntegrityError, set_ec
  29. from .helpers import uid2user, user2uid, gid2group, group2gid
  30. from .helpers import parse_timestamp, to_localtime
  31. from .helpers import OutputTimestamp, format_timedelta, format_file_size, file_status, FileSize
  32. from .helpers import safe_encode, safe_decode, make_path_safe, remove_surrogates
  33. from .helpers import StableDict
  34. from .helpers import bin_to_hex
  35. from .helpers import safe_ns
  36. from .helpers import ellipsis_truncate, ProgressIndicatorPercent, log_multi
  37. from .patterns import PathPrefixPattern, FnmatchPattern, IECommand
  38. from .item import Item, ArchiveItem
  39. from .platform import acl_get, acl_set, set_flags, get_flags, swidth
  40. from .remote import cache_if_remote
  41. from .repository import Repository, LIST_SCAN_LIMIT
  42. has_lchmod = hasattr(os, 'lchmod')
  43. flags_normal = os.O_RDONLY | getattr(os, 'O_BINARY', 0)
  44. flags_noatime = flags_normal | getattr(os, 'O_NOATIME', 0)
  45. class Statistics:
  46. def __init__(self, output_json=False):
  47. self.output_json = output_json
  48. self.osize = self.csize = self.usize = self.nfiles = 0
  49. self.last_progress = 0 # timestamp when last progress was shown
  50. def update(self, size, csize, unique):
  51. self.osize += size
  52. self.csize += csize
  53. if unique:
  54. self.usize += csize
  55. summary = "{label:15} {stats.osize_fmt:>20s} {stats.csize_fmt:>20s} {stats.usize_fmt:>20s}"
  56. def __str__(self):
  57. return self.summary.format(stats=self, label='This archive:')
  58. def __repr__(self):
  59. return "<{cls} object at {hash:#x} ({self.osize}, {self.csize}, {self.usize})>".format(
  60. cls=type(self).__name__, hash=id(self), self=self)
  61. def as_dict(self):
  62. return {
  63. 'original_size': FileSize(self.osize),
  64. 'compressed_size': FileSize(self.csize),
  65. 'deduplicated_size': FileSize(self.usize),
  66. 'nfiles': self.nfiles,
  67. }
  68. @property
  69. def osize_fmt(self):
  70. return format_file_size(self.osize)
  71. @property
  72. def usize_fmt(self):
  73. return format_file_size(self.usize)
  74. @property
  75. def csize_fmt(self):
  76. return format_file_size(self.csize)
  77. def show_progress(self, item=None, final=False, stream=None, dt=None):
  78. now = time.monotonic()
  79. if dt is None or now - self.last_progress > dt:
  80. self.last_progress = now
  81. if self.output_json:
  82. data = self.as_dict()
  83. data.update({
  84. 'time': time.time(),
  85. 'type': 'archive_progress',
  86. 'path': remove_surrogates(item.path if item else ''),
  87. })
  88. msg = json.dumps(data)
  89. end = '\n'
  90. else:
  91. columns, lines = get_terminal_size()
  92. if not final:
  93. msg = '{0.osize_fmt} O {0.csize_fmt} C {0.usize_fmt} D {0.nfiles} N '.format(self)
  94. path = remove_surrogates(item.path) if item else ''
  95. space = columns - swidth(msg)
  96. if space < 12:
  97. msg = ''
  98. space = columns - swidth(msg)
  99. if space >= 8:
  100. msg += ellipsis_truncate(path, space)
  101. else:
  102. msg = ' ' * columns
  103. end = '\r'
  104. print(msg, end=end, file=stream or sys.stderr, flush=True)
  105. def is_special(mode):
  106. # file types that get special treatment in --read-special mode
  107. return stat.S_ISBLK(mode) or stat.S_ISCHR(mode) or stat.S_ISFIFO(mode)
  108. class BackupOSError(Exception):
  109. """
  110. Wrapper for OSError raised while accessing backup files.
  111. Borg does different kinds of IO, and IO failures have different consequences.
  112. This wrapper represents failures of input file or extraction IO.
  113. These are non-critical and are only reported (exit code = 1, warning).
  114. Any unwrapped IO error is critical and aborts execution (for example repository IO failure).
  115. """
  116. def __init__(self, op, os_error):
  117. self.op = op
  118. self.os_error = os_error
  119. self.errno = os_error.errno
  120. self.strerror = os_error.strerror
  121. self.filename = os_error.filename
  122. def __str__(self):
  123. if self.op:
  124. return '%s: %s' % (self.op, self.os_error)
  125. else:
  126. return str(self.os_error)
  127. class BackupIO:
  128. op = ''
  129. def __call__(self, op=''):
  130. self.op = op
  131. return self
  132. def __enter__(self):
  133. pass
  134. def __exit__(self, exc_type, exc_val, exc_tb):
  135. if exc_type and issubclass(exc_type, OSError):
  136. raise BackupOSError(self.op, exc_val) from exc_val
  137. backup_io = BackupIO()
  138. def backup_io_iter(iterator):
  139. backup_io.op = 'read'
  140. while True:
  141. with backup_io:
  142. try:
  143. item = next(iterator)
  144. except StopIteration:
  145. return
  146. yield item
  147. class DownloadPipeline:
  148. def __init__(self, repository, key):
  149. self.repository = repository
  150. self.key = key
  151. def unpack_many(self, ids, filter=None, preload=False):
  152. """
  153. Return iterator of items.
  154. *ids* is a chunk ID list of an item stream. *filter* is a callable
  155. to decide whether an item will be yielded. *preload* preloads the data chunks of every yielded item.
  156. Warning: if *preload* is True then all data chunks of every yielded item have to be retrieved,
  157. otherwise preloaded chunks will accumulate in RemoteRepository and create a memory leak.
  158. """
  159. unpacker = msgpack.Unpacker(use_list=False)
  160. for data in self.fetch_many(ids):
  161. unpacker.feed(data)
  162. items = [Item(internal_dict=item) for item in unpacker]
  163. for item in items:
  164. if 'chunks' in item:
  165. item.chunks = [ChunkListEntry(*e) for e in item.chunks]
  166. if filter:
  167. items = [item for item in items if filter(item)]
  168. if preload:
  169. for item in items:
  170. if 'chunks' in item:
  171. self.repository.preload([c.id for c in item.chunks])
  172. for item in items:
  173. yield item
  174. def fetch_many(self, ids, is_preloaded=False):
  175. for id_, data in zip(ids, self.repository.get_many(ids, is_preloaded=is_preloaded)):
  176. yield self.key.decrypt(id_, data)
  177. class ChunkBuffer:
  178. BUFFER_SIZE = 8 * 1024 * 1024
  179. def __init__(self, key, chunker_params=ITEMS_CHUNKER_PARAMS):
  180. self.buffer = BytesIO()
  181. self.packer = msgpack.Packer(unicode_errors='surrogateescape')
  182. self.chunks = []
  183. self.key = key
  184. self.chunker = Chunker(self.key.chunk_seed, *chunker_params)
  185. def add(self, item):
  186. self.buffer.write(self.packer.pack(item.as_dict()))
  187. if self.is_full():
  188. self.flush()
  189. def write_chunk(self, chunk):
  190. raise NotImplementedError
  191. def flush(self, flush=False):
  192. if self.buffer.tell() == 0:
  193. return
  194. self.buffer.seek(0)
  195. # The chunker returns a memoryview to its internal buffer,
  196. # thus a copy is needed before resuming the chunker iterator.
  197. chunks = list(bytes(s) for s in self.chunker.chunkify(self.buffer))
  198. self.buffer.seek(0)
  199. self.buffer.truncate(0)
  200. # Leave the last partial chunk in the buffer unless flush is True
  201. end = None if flush or len(chunks) == 1 else -1
  202. for chunk in chunks[:end]:
  203. self.chunks.append(self.write_chunk(chunk))
  204. if end == -1:
  205. self.buffer.write(chunks[-1])
  206. def is_full(self):
  207. return self.buffer.tell() > self.BUFFER_SIZE
  208. class CacheChunkBuffer(ChunkBuffer):
  209. def __init__(self, cache, key, stats, chunker_params=ITEMS_CHUNKER_PARAMS):
  210. super().__init__(key, chunker_params)
  211. self.cache = cache
  212. self.stats = stats
  213. def write_chunk(self, chunk):
  214. id_, _, _ = self.cache.add_chunk(self.key.id_hash(chunk), chunk, self.stats, wait=False)
  215. self.cache.repository.async_response(wait=False)
  216. return id_
  217. class Archive:
  218. class DoesNotExist(Error):
  219. """Archive {} does not exist"""
  220. class AlreadyExists(Error):
  221. """Archive {} already exists"""
  222. class IncompatibleFilesystemEncodingError(Error):
  223. """Failed to encode filename "{}" into file system encoding "{}". Consider configuring the LANG environment variable."""
  224. def __init__(self, repository, key, manifest, name, cache=None, create=False,
  225. checkpoint_interval=300, numeric_owner=False, noatime=False, noctime=False, progress=False,
  226. chunker_params=CHUNKER_PARAMS, start=None, start_monotonic=None, end=None,
  227. consider_part_files=False, log_json=False):
  228. self.cwd = os.getcwd()
  229. self.key = key
  230. self.repository = repository
  231. self.cache = cache
  232. self.manifest = manifest
  233. self.hard_links = {}
  234. self.stats = Statistics(output_json=log_json)
  235. self.show_progress = progress
  236. self.name = name # overwritten later with name from archive metadata
  237. self.name_in_manifest = name # can differ from .name later (if borg check fixed duplicate archive names)
  238. self.comment = None
  239. self.checkpoint_interval = checkpoint_interval
  240. self.numeric_owner = numeric_owner
  241. self.noatime = noatime
  242. self.noctime = noctime
  243. assert (start is None) == (start_monotonic is None), 'Logic error: if start is given, start_monotonic must be given as well and vice versa.'
  244. if start is None:
  245. start = datetime.utcnow()
  246. start_monotonic = time.monotonic()
  247. self.chunker_params = chunker_params
  248. self.start = start
  249. self.start_monotonic = start_monotonic
  250. if end is None:
  251. end = datetime.utcnow()
  252. self.end = end
  253. self.consider_part_files = consider_part_files
  254. self.pipeline = DownloadPipeline(self.repository, self.key)
  255. self.create = create
  256. if self.create:
  257. self.items_buffer = CacheChunkBuffer(self.cache, self.key, self.stats)
  258. self.chunker = Chunker(self.key.chunk_seed, *chunker_params)
  259. if name in manifest.archives:
  260. raise self.AlreadyExists(name)
  261. self.last_checkpoint = time.monotonic()
  262. i = 0
  263. while True:
  264. self.checkpoint_name = '%s.checkpoint%s' % (name, i and ('.%d' % i) or '')
  265. if self.checkpoint_name not in manifest.archives:
  266. break
  267. i += 1
  268. else:
  269. info = self.manifest.archives.get(name)
  270. if info is None:
  271. raise self.DoesNotExist(name)
  272. self.load(info.id)
  273. self.zeros = None
  274. def _load_meta(self, id):
  275. data = self.key.decrypt(id, self.repository.get(id))
  276. metadata = ArchiveItem(internal_dict=msgpack.unpackb(data, unicode_errors='surrogateescape'))
  277. if metadata.version != 1:
  278. raise Exception('Unknown archive metadata version')
  279. return metadata
  280. def load(self, id):
  281. self.id = id
  282. self.metadata = self._load_meta(self.id)
  283. self.metadata.cmdline = [safe_decode(arg) for arg in self.metadata.cmdline]
  284. self.name = self.metadata.name
  285. self.comment = self.metadata.get('comment', '')
  286. @property
  287. def ts(self):
  288. """Timestamp of archive creation (start) in UTC"""
  289. ts = self.metadata.time
  290. return parse_timestamp(ts)
  291. @property
  292. def ts_end(self):
  293. """Timestamp of archive creation (end) in UTC"""
  294. # fall back to time if there is no time_end present in metadata
  295. ts = self.metadata.get('time_end') or self.metadata.time
  296. return parse_timestamp(ts)
  297. @property
  298. def fpr(self):
  299. return bin_to_hex(self.id)
  300. @property
  301. def duration(self):
  302. return format_timedelta(self.end - self.start)
  303. @property
  304. def duration_from_meta(self):
  305. return format_timedelta(self.ts_end - self.ts)
  306. def info(self):
  307. if self.create:
  308. stats = self.stats
  309. start = self.start.replace(tzinfo=timezone.utc)
  310. end = self.end.replace(tzinfo=timezone.utc)
  311. else:
  312. stats = self.calc_stats(self.cache)
  313. start = self.ts
  314. end = self.ts_end
  315. info = {
  316. 'name': self.name,
  317. 'id': self.fpr,
  318. 'start': OutputTimestamp(start),
  319. 'end': OutputTimestamp(end),
  320. 'duration': (end - start).total_seconds(),
  321. 'stats': stats.as_dict(),
  322. 'limits': {
  323. 'max_archive_size': self.cache.chunks[self.id].csize / MAX_DATA_SIZE,
  324. },
  325. }
  326. if self.create:
  327. info['command_line'] = sys.argv
  328. else:
  329. info.update({
  330. 'command_line': self.metadata.cmdline,
  331. 'hostname': self.metadata.hostname,
  332. 'username': self.metadata.username,
  333. 'comment': self.metadata.get('comment', ''),
  334. })
  335. return info
  336. def __str__(self):
  337. return '''\
  338. Archive name: {0.name}
  339. Archive fingerprint: {0.fpr}
  340. Time (start): {start}
  341. Time (end): {end}
  342. Duration: {0.duration}
  343. Number of files: {0.stats.nfiles}
  344. Utilization of max. archive size: {csize_max:.0%}
  345. '''.format(
  346. self,
  347. start=OutputTimestamp(self.start.replace(tzinfo=timezone.utc)),
  348. end=OutputTimestamp(self.end.replace(tzinfo=timezone.utc)),
  349. csize_max=self.cache.chunks[self.id].csize / MAX_DATA_SIZE)
  350. def __repr__(self):
  351. return 'Archive(%r)' % self.name
  352. def item_filter(self, item, filter=None):
  353. if not self.consider_part_files and 'part' in item:
  354. # this is a part(ial) file, we usually don't want to consider it.
  355. return False
  356. return filter(item) if filter else True
  357. def iter_items(self, filter=None, preload=False):
  358. for item in self.pipeline.unpack_many(self.metadata.items, preload=preload,
  359. filter=lambda item: self.item_filter(item, filter)):
  360. yield item
  361. def add_item(self, item, show_progress=True):
  362. if show_progress and self.show_progress:
  363. self.stats.show_progress(item=item, dt=0.2)
  364. self.items_buffer.add(item)
  365. def write_checkpoint(self):
  366. self.save(self.checkpoint_name)
  367. del self.manifest.archives[self.checkpoint_name]
  368. self.cache.chunk_decref(self.id, self.stats)
  369. def save(self, name=None, comment=None, timestamp=None, additional_metadata=None):
  370. name = name or self.name
  371. if name in self.manifest.archives:
  372. raise self.AlreadyExists(name)
  373. self.items_buffer.flush(flush=True)
  374. duration = timedelta(seconds=time.monotonic() - self.start_monotonic)
  375. if timestamp is None:
  376. end = datetime.utcnow()
  377. start = end - duration
  378. else:
  379. end = timestamp + duration
  380. start = timestamp
  381. self.start = start
  382. self.end = end
  383. metadata = {
  384. 'version': 1,
  385. 'name': name,
  386. 'comment': comment or '',
  387. 'items': self.items_buffer.chunks,
  388. 'cmdline': sys.argv,
  389. 'hostname': socket.gethostname(),
  390. 'username': getuser(),
  391. 'time': start.strftime(ISO_FORMAT),
  392. 'time_end': end.strftime(ISO_FORMAT),
  393. 'chunker_params': self.chunker_params,
  394. }
  395. metadata.update(additional_metadata or {})
  396. metadata = ArchiveItem(metadata)
  397. data = self.key.pack_and_authenticate_metadata(metadata.as_dict(), context=b'archive')
  398. self.id = self.key.id_hash(data)
  399. self.cache.add_chunk(self.id, data, self.stats)
  400. while self.repository.async_response(wait=True) is not None:
  401. pass
  402. self.manifest.archives[name] = (self.id, metadata.time)
  403. self.manifest.write()
  404. self.repository.commit()
  405. self.cache.commit()
  406. def calc_stats(self, cache):
  407. def add(id):
  408. entry = cache.chunks[id]
  409. archive_index.add(id, 1, entry.size, entry.csize)
  410. archive_index = ChunkIndex()
  411. sync = CacheSynchronizer(archive_index)
  412. add(self.id)
  413. pi = ProgressIndicatorPercent(total=len(self.metadata.items), msg='Calculating statistics... %3d%%')
  414. for id, chunk in zip(self.metadata.items, self.repository.get_many(self.metadata.items)):
  415. pi.show(increase=1)
  416. add(id)
  417. data = self.key.decrypt(id, chunk)
  418. sync.feed(data)
  419. stats = Statistics()
  420. stats.osize, stats.csize, unique_size, stats.usize, unique_chunks, chunks = archive_index.stats_against(cache.chunks)
  421. stats.nfiles = sync.num_files
  422. pi.finish()
  423. return stats
  424. @contextmanager
  425. def extract_helper(self, dest, item, path, stripped_components, original_path, hardlink_masters):
  426. hardlink_set = False
  427. # Hard link?
  428. if 'source' in item:
  429. source = os.path.join(dest, *item.source.split(os.sep)[stripped_components:])
  430. chunks, link_target = hardlink_masters.get(item.source, (None, source))
  431. if link_target:
  432. # Hard link was extracted previously, just link
  433. with backup_io('link'):
  434. os.link(link_target, path)
  435. hardlink_set = True
  436. elif chunks is not None:
  437. # assign chunks to this item, since the item which had the chunks was not extracted
  438. item.chunks = chunks
  439. yield hardlink_set
  440. if not hardlink_set and hardlink_masters:
  441. # Update master entry with extracted item path, so that following hardlinks don't extract twice.
  442. hardlink_masters[item.get('source') or original_path] = (None, path)
  443. def extract_item(self, item, restore_attrs=True, dry_run=False, stdout=False, sparse=False,
  444. hardlink_masters=None, stripped_components=0, original_path=None, pi=None):
  445. """
  446. Extract archive item.
  447. :param item: the item to extract
  448. :param restore_attrs: restore file attributes
  449. :param dry_run: do not write any data
  450. :param stdout: write extracted data to stdout
  451. :param sparse: write sparse files (chunk-granularity, independent of the original being sparse)
  452. :param hardlink_masters: maps paths to (chunks, link_target) for extracting subtrees with hardlinks correctly
  453. :param stripped_components: stripped leading path components to correct hard link extraction
  454. :param original_path: 'path' key as stored in archive
  455. :param pi: ProgressIndicatorPercent (or similar) for file extraction progress (in bytes)
  456. """
  457. hardlink_masters = hardlink_masters or {}
  458. has_damaged_chunks = 'chunks_healthy' in item
  459. if dry_run or stdout:
  460. if 'chunks' in item:
  461. item_chunks_size = 0
  462. for data in self.pipeline.fetch_many([c.id for c in item.chunks], is_preloaded=True):
  463. if pi:
  464. pi.show(increase=len(data), info=[remove_surrogates(item.path)])
  465. if stdout:
  466. sys.stdout.buffer.write(data)
  467. item_chunks_size += len(data)
  468. if stdout:
  469. sys.stdout.buffer.flush()
  470. if 'size' in item:
  471. item_size = item.size
  472. if item_size != item_chunks_size:
  473. logger.warning('{}: size inconsistency detected: size {}, chunks size {}'.format(
  474. item.path, item_size, item_chunks_size))
  475. if has_damaged_chunks:
  476. logger.warning('File %s has damaged (all-zero) chunks. Try running borg check --repair.' %
  477. remove_surrogates(item.path))
  478. return
  479. original_path = original_path or item.path
  480. dest = self.cwd
  481. if item.path.startswith(('/', '..')):
  482. raise Exception('Path should be relative and local')
  483. path = os.path.join(dest, item.path)
  484. # Attempt to remove existing files, ignore errors on failure
  485. try:
  486. st = os.stat(path, follow_symlinks=False)
  487. if stat.S_ISDIR(st.st_mode):
  488. os.rmdir(path)
  489. else:
  490. os.unlink(path)
  491. except UnicodeEncodeError:
  492. raise self.IncompatibleFilesystemEncodingError(path, sys.getfilesystemencoding()) from None
  493. except OSError:
  494. pass
  495. def make_parent(path):
  496. parent_dir = os.path.dirname(path)
  497. if not os.path.exists(parent_dir):
  498. os.makedirs(parent_dir)
  499. mode = item.mode
  500. if stat.S_ISREG(mode):
  501. with backup_io('makedirs'):
  502. make_parent(path)
  503. with self.extract_helper(dest, item, path, stripped_components, original_path,
  504. hardlink_masters) as hardlink_set:
  505. if hardlink_set:
  506. return
  507. if sparse and self.zeros is None:
  508. self.zeros = b'\0' * (1 << self.chunker_params[1])
  509. with backup_io('open'):
  510. fd = open(path, 'wb')
  511. with fd:
  512. ids = [c.id for c in item.chunks]
  513. for data in self.pipeline.fetch_many(ids, is_preloaded=True):
  514. if pi:
  515. pi.show(increase=len(data), info=[remove_surrogates(item.path)])
  516. with backup_io('write'):
  517. if sparse and self.zeros.startswith(data):
  518. # all-zero chunk: create a hole in a sparse file
  519. fd.seek(len(data), 1)
  520. else:
  521. fd.write(data)
  522. with backup_io('truncate_and_attrs'):
  523. pos = item_chunks_size = fd.tell()
  524. fd.truncate(pos)
  525. fd.flush()
  526. self.restore_attrs(path, item, fd=fd.fileno())
  527. if 'size' in item:
  528. item_size = item.size
  529. if item_size != item_chunks_size:
  530. logger.warning('{}: size inconsistency detected: size {}, chunks size {}'.format(
  531. item.path, item_size, item_chunks_size))
  532. if has_damaged_chunks:
  533. logger.warning('File %s has damaged (all-zero) chunks. Try running borg check --repair.' %
  534. remove_surrogates(item.path))
  535. return
  536. with backup_io:
  537. # No repository access beyond this point.
  538. if stat.S_ISDIR(mode):
  539. make_parent(path)
  540. if not os.path.exists(path):
  541. os.mkdir(path)
  542. if restore_attrs:
  543. self.restore_attrs(path, item)
  544. elif stat.S_ISLNK(mode):
  545. make_parent(path)
  546. source = item.source
  547. try:
  548. os.symlink(source, path)
  549. except UnicodeEncodeError:
  550. raise self.IncompatibleFilesystemEncodingError(source, sys.getfilesystemencoding()) from None
  551. self.restore_attrs(path, item, symlink=True)
  552. elif stat.S_ISFIFO(mode):
  553. make_parent(path)
  554. with self.extract_helper(dest, item, path, stripped_components, original_path,
  555. hardlink_masters) as hardlink_set:
  556. if hardlink_set:
  557. return
  558. os.mkfifo(path)
  559. self.restore_attrs(path, item)
  560. elif stat.S_ISCHR(mode) or stat.S_ISBLK(mode):
  561. make_parent(path)
  562. with self.extract_helper(dest, item, path, stripped_components, original_path,
  563. hardlink_masters) as hardlink_set:
  564. if hardlink_set:
  565. return
  566. os.mknod(path, item.mode, item.rdev)
  567. self.restore_attrs(path, item)
  568. else:
  569. raise Exception('Unknown archive item type %r' % item.mode)
  570. def restore_attrs(self, path, item, symlink=False, fd=None):
  571. """
  572. Restore filesystem attributes on *path* (*fd*) from *item*.
  573. Does not access the repository.
  574. """
  575. backup_io.op = 'attrs'
  576. uid = gid = None
  577. if not self.numeric_owner:
  578. uid = user2uid(item.user)
  579. gid = group2gid(item.group)
  580. uid = item.uid if uid is None else uid
  581. gid = item.gid if gid is None else gid
  582. # This code is a bit of a mess due to os specific differences
  583. try:
  584. if fd:
  585. os.fchown(fd, uid, gid)
  586. else:
  587. os.chown(path, uid, gid, follow_symlinks=False)
  588. except OSError:
  589. pass
  590. if fd:
  591. os.fchmod(fd, item.mode)
  592. elif not symlink:
  593. os.chmod(path, item.mode)
  594. elif has_lchmod: # Not available on Linux
  595. os.lchmod(path, item.mode)
  596. mtime = item.mtime
  597. if 'atime' in item:
  598. atime = item.atime
  599. else:
  600. # old archives only had mtime in item metadata
  601. atime = mtime
  602. try:
  603. if fd:
  604. os.utime(fd, None, ns=(atime, mtime))
  605. else:
  606. os.utime(path, None, ns=(atime, mtime), follow_symlinks=False)
  607. except OSError:
  608. # some systems don't support calling utime on a symlink
  609. pass
  610. acl_set(path, item, self.numeric_owner)
  611. if 'bsdflags' in item:
  612. try:
  613. set_flags(path, item.bsdflags, fd=fd)
  614. except OSError:
  615. pass
  616. # chown removes Linux capabilities, so set the extended attributes at the end, after chown, since they include
  617. # the Linux capabilities in the "security.capability" attribute.
  618. xattrs = item.get('xattrs', {})
  619. for k, v in xattrs.items():
  620. try:
  621. xattr.setxattr(fd or path, k, v, follow_symlinks=False)
  622. except OSError as e:
  623. if e.errno == errno.E2BIG:
  624. # xattr is too big
  625. logger.warning('%s: Value or key of extended attribute %s is too big for this filesystem' %
  626. (path, k.decode()))
  627. set_ec(EXIT_WARNING)
  628. elif e.errno == errno.ENOTSUP:
  629. # xattrs not supported here
  630. logger.warning('%s: Extended attributes are not supported on this filesystem' % path)
  631. set_ec(EXIT_WARNING)
  632. elif e.errno == errno.EACCES:
  633. # permission denied to set this specific xattr (this may happen related to security.* keys)
  634. logger.warning('%s: Permission denied when setting extended attribute %s' % (path, k.decode()))
  635. set_ec(EXIT_WARNING)
  636. else:
  637. raise
  638. def set_meta(self, key, value):
  639. metadata = self._load_meta(self.id)
  640. setattr(metadata, key, value)
  641. data = msgpack.packb(metadata.as_dict(), unicode_errors='surrogateescape')
  642. new_id = self.key.id_hash(data)
  643. self.cache.add_chunk(new_id, data, self.stats)
  644. self.manifest.archives[self.name] = (new_id, metadata.time)
  645. self.cache.chunk_decref(self.id, self.stats)
  646. self.id = new_id
  647. def rename(self, name):
  648. if name in self.manifest.archives:
  649. raise self.AlreadyExists(name)
  650. oldname = self.name
  651. self.name = name
  652. self.set_meta('name', name)
  653. del self.manifest.archives[oldname]
  654. def delete(self, stats, progress=False, forced=False):
  655. class ChunksIndexError(Error):
  656. """Chunk ID {} missing from chunks index, corrupted chunks index - aborting transaction."""
  657. exception_ignored = object()
  658. def fetch_async_response(wait=True):
  659. try:
  660. return self.repository.async_response(wait=wait)
  661. except Repository.ObjectNotFound as e:
  662. nonlocal error
  663. # object not in repo - strange, but we wanted to delete it anyway.
  664. if forced == 0:
  665. raise
  666. error = True
  667. return exception_ignored # must not return None here
  668. def chunk_decref(id, stats):
  669. try:
  670. self.cache.chunk_decref(id, stats, wait=False)
  671. except KeyError:
  672. cid = bin_to_hex(id)
  673. raise ChunksIndexError(cid)
  674. else:
  675. fetch_async_response(wait=False)
  676. error = False
  677. try:
  678. unpacker = msgpack.Unpacker(use_list=False)
  679. items_ids = self.metadata.items
  680. pi = ProgressIndicatorPercent(total=len(items_ids), msg="Decrementing references %3.0f%%", msgid='archive.delete')
  681. for (i, (items_id, data)) in enumerate(zip(items_ids, self.repository.get_many(items_ids))):
  682. if progress:
  683. pi.show(i)
  684. data = self.key.decrypt(items_id, data)
  685. unpacker.feed(data)
  686. chunk_decref(items_id, stats)
  687. try:
  688. for item in unpacker:
  689. item = Item(internal_dict=item)
  690. if 'chunks' in item:
  691. for chunk_id, size, csize in item.chunks:
  692. chunk_decref(chunk_id, stats)
  693. except (TypeError, ValueError):
  694. # if items metadata spans multiple chunks and one chunk got dropped somehow,
  695. # it could be that unpacker yields bad types
  696. if forced == 0:
  697. raise
  698. error = True
  699. if progress:
  700. pi.finish()
  701. except (msgpack.UnpackException, Repository.ObjectNotFound):
  702. # items metadata corrupted
  703. if forced == 0:
  704. raise
  705. error = True
  706. # in forced delete mode, we try hard to delete at least the manifest entry,
  707. # if possible also the archive superblock, even if processing the items raises
  708. # some harmless exception.
  709. chunk_decref(self.id, stats)
  710. del self.manifest.archives[self.name]
  711. while fetch_async_response(wait=True) is not None:
  712. # we did async deletes, process outstanding results (== exceptions),
  713. # so there is nothing pending when we return and our caller wants to commit.
  714. pass
  715. if error:
  716. logger.warning('forced deletion succeeded, but the deleted archive was corrupted.')
  717. logger.warning('borg check --repair is required to free all space.')
  718. def stat_simple_attrs(self, st):
  719. attrs = dict(
  720. mode=st.st_mode,
  721. uid=st.st_uid,
  722. gid=st.st_gid,
  723. mtime=safe_ns(st.st_mtime_ns),
  724. )
  725. # borg can work with archives only having mtime (older attic archives do not have
  726. # atime/ctime). it can be useful to omit atime/ctime, if they change without the
  727. # file content changing - e.g. to get better metadata deduplication.
  728. if not self.noatime:
  729. attrs['atime'] = safe_ns(st.st_atime_ns)
  730. if not self.noctime:
  731. attrs['ctime'] = safe_ns(st.st_ctime_ns)
  732. if self.numeric_owner:
  733. attrs['user'] = attrs['group'] = None
  734. else:
  735. attrs['user'] = uid2user(st.st_uid)
  736. attrs['group'] = gid2group(st.st_gid)
  737. return attrs
  738. def stat_ext_attrs(self, st, path):
  739. attrs = {}
  740. with backup_io('extended stat'):
  741. xattrs = xattr.get_all(path, follow_symlinks=False)
  742. bsdflags = get_flags(path, st)
  743. acl_get(path, attrs, st, self.numeric_owner)
  744. if xattrs:
  745. attrs['xattrs'] = StableDict(xattrs)
  746. if bsdflags:
  747. attrs['bsdflags'] = bsdflags
  748. return attrs
  749. def stat_attrs(self, st, path):
  750. attrs = self.stat_simple_attrs(st)
  751. attrs.update(self.stat_ext_attrs(st, path))
  752. return attrs
  753. @contextmanager
  754. def create_helper(self, path, st, status=None, hardlinkable=True):
  755. safe_path = make_path_safe(path)
  756. item = Item(path=safe_path)
  757. hardlink_master = False
  758. hardlinked = hardlinkable and st.st_nlink > 1
  759. if hardlinked:
  760. source = self.hard_links.get((st.st_ino, st.st_dev))
  761. if source is not None:
  762. item.source = source
  763. status = 'h' # hardlink (to already seen inodes)
  764. else:
  765. hardlink_master = True
  766. yield item, status, hardlinked, hardlink_master
  767. # if we get here, "with"-block worked ok without error/exception, the item was processed ok...
  768. self.add_item(item)
  769. # ... and added to the archive, so we can remember it to refer to it later in the archive:
  770. if hardlink_master:
  771. self.hard_links[(st.st_ino, st.st_dev)] = safe_path
  772. def process_dir(self, path, st):
  773. with self.create_helper(path, st, 'd', hardlinkable=False) as (item, status, hardlinked, hardlink_master):
  774. item.update(self.stat_attrs(st, path))
  775. return status
  776. def process_fifo(self, path, st):
  777. with self.create_helper(path, st, 'f') as (item, status, hardlinked, hardlink_master): # fifo
  778. item.update(self.stat_attrs(st, path))
  779. return status
  780. def process_dev(self, path, st, dev_type):
  781. with self.create_helper(path, st, dev_type) as (item, status, hardlinked, hardlink_master): # char/block device
  782. item.rdev = st.st_rdev
  783. item.update(self.stat_attrs(st, path))
  784. return status
  785. def process_symlink(self, path, st):
  786. # note: using hardlinkable=False because we can not support hardlinked symlinks,
  787. # due to the dual-use of item.source, see issue #2343:
  788. with self.create_helper(path, st, 's', hardlinkable=False) as (item, status, hardlinked, hardlink_master):
  789. with backup_io('readlink'):
  790. source = os.readlink(path)
  791. item.source = source
  792. if st.st_nlink > 1:
  793. logger.warning('hardlinked symlinks will be archived as non-hardlinked symlinks!')
  794. item.update(self.stat_attrs(st, path))
  795. return status
  796. def write_part_file(self, item, from_chunk, number):
  797. item = Item(internal_dict=item.as_dict())
  798. length = len(item.chunks)
  799. # the item should only have the *additional* chunks we processed after the last partial item:
  800. item.chunks = item.chunks[from_chunk:]
  801. item.get_size(memorize=True)
  802. item.path += '.borg_part_%d' % number
  803. item.part = number
  804. number += 1
  805. self.add_item(item, show_progress=False)
  806. self.write_checkpoint()
  807. return length, number
  808. def chunk_file(self, item, cache, stats, chunk_iter, chunk_processor=None):
  809. if not chunk_processor:
  810. def chunk_processor(data):
  811. chunk_entry = cache.add_chunk(self.key.id_hash(data), data, stats, wait=False)
  812. self.cache.repository.async_response(wait=False)
  813. return chunk_entry
  814. item.chunks = []
  815. from_chunk = 0
  816. part_number = 1
  817. for data in chunk_iter:
  818. item.chunks.append(chunk_processor(data))
  819. if self.show_progress:
  820. self.stats.show_progress(item=item, dt=0.2)
  821. if self.checkpoint_interval and time.monotonic() - self.last_checkpoint > self.checkpoint_interval:
  822. from_chunk, part_number = self.write_part_file(item, from_chunk, part_number)
  823. self.last_checkpoint = time.monotonic()
  824. else:
  825. if part_number > 1:
  826. if item.chunks[from_chunk:]:
  827. # if we already have created a part item inside this file, we want to put the final
  828. # chunks (if any) into a part item also (so all parts can be concatenated to get
  829. # the complete file):
  830. from_chunk, part_number = self.write_part_file(item, from_chunk, part_number)
  831. self.last_checkpoint = time.monotonic()
  832. # if we created part files, we have referenced all chunks from the part files,
  833. # but we also will reference the same chunks also from the final, complete file:
  834. for chunk in item.chunks:
  835. cache.chunk_incref(chunk.id, stats, size=chunk.size)
  836. def process_stdin(self, path, cache):
  837. uid, gid = 0, 0
  838. t = int(time.time()) * 1000000000
  839. item = Item(
  840. path=path,
  841. mode=0o100660, # regular file, ug=rw
  842. uid=uid, user=uid2user(uid),
  843. gid=gid, group=gid2group(gid),
  844. mtime=t, atime=t, ctime=t,
  845. )
  846. fd = sys.stdin.buffer # binary
  847. self.chunk_file(item, cache, self.stats, backup_io_iter(self.chunker.chunkify(fd)))
  848. item.get_size(memorize=True)
  849. self.stats.nfiles += 1
  850. self.add_item(item)
  851. return 'i' # stdin
  852. def process_file(self, path, st, cache, ignore_inode=False, files_cache_mode=DEFAULT_FILES_CACHE_MODE):
  853. with self.create_helper(path, st, None) as (item, status, hardlinked, hardlink_master): # no status yet
  854. is_special_file = is_special(st.st_mode)
  855. if not hardlinked or hardlink_master:
  856. if not is_special_file:
  857. path_hash = self.key.id_hash(safe_encode(os.path.join(self.cwd, path)))
  858. ids = cache.file_known_and_unchanged(path_hash, st, ignore_inode, files_cache_mode)
  859. else:
  860. # in --read-special mode, we may be called for special files.
  861. # there should be no information in the cache about special files processed in
  862. # read-special mode, but we better play safe as this was wrong in the past:
  863. path_hash = ids = None
  864. first_run = not cache.files and cache.do_files
  865. if first_run:
  866. logger.debug('Processing files ...')
  867. chunks = None
  868. if ids is not None:
  869. # Make sure all ids are available
  870. for id_ in ids:
  871. if not cache.seen_chunk(id_):
  872. break
  873. else:
  874. chunks = [cache.chunk_incref(id_, self.stats) for id_ in ids]
  875. status = 'U' # regular file, unchanged
  876. else:
  877. status = 'A' # regular file, added
  878. item.hardlink_master = hardlinked
  879. item.update(self.stat_simple_attrs(st))
  880. # Only chunkify the file if needed
  881. if chunks is not None:
  882. item.chunks = chunks
  883. else:
  884. with backup_io('open'):
  885. fh = Archive._open_rb(path)
  886. with os.fdopen(fh, 'rb') as fd:
  887. self.chunk_file(item, cache, self.stats, backup_io_iter(self.chunker.chunkify(fd, fh)))
  888. if not is_special_file:
  889. # we must not memorize special files, because the contents of e.g. a
  890. # block or char device will change without its mtime/size/inode changing.
  891. cache.memorize_file(path_hash, st, [c.id for c in item.chunks], files_cache_mode)
  892. status = status or 'M' # regular file, modified (if not 'A' already)
  893. self.stats.nfiles += 1
  894. item.update(self.stat_attrs(st, path))
  895. item.get_size(memorize=True)
  896. if is_special_file:
  897. # we processed a special file like a regular file. reflect that in mode,
  898. # so it can be extracted / accessed in FUSE mount like a regular file:
  899. item.mode = stat.S_IFREG | stat.S_IMODE(item.mode)
  900. return status
  901. @staticmethod
  902. def list_archives(repository, key, manifest, cache=None):
  903. # expensive! see also Manifest.list_archive_infos.
  904. for name in manifest.archives:
  905. yield Archive(repository, key, manifest, name, cache=cache)
  906. @staticmethod
  907. def _open_rb(path):
  908. try:
  909. # if we have O_NOATIME, this likely will succeed if we are root or owner of file:
  910. return os.open(path, flags_noatime)
  911. except PermissionError:
  912. if flags_noatime == flags_normal:
  913. # we do not have O_NOATIME, no need to try again:
  914. raise
  915. # Was this EPERM due to the O_NOATIME flag? Try again without it:
  916. return os.open(path, flags_normal)
  917. def valid_msgpacked_dict(d, keys_serialized):
  918. """check if the data <d> looks like a msgpacked dict"""
  919. d_len = len(d)
  920. if d_len == 0:
  921. return False
  922. if d[0] & 0xf0 == 0x80: # object is a fixmap (up to 15 elements)
  923. offs = 1
  924. elif d[0] == 0xde: # object is a map16 (up to 2^16-1 elements)
  925. offs = 3
  926. else:
  927. # object is not a map (dict)
  928. # note: we must not have dicts with > 2^16-1 elements
  929. return False
  930. if d_len <= offs:
  931. return False
  932. # is the first dict key a bytestring?
  933. if d[offs] & 0xe0 == 0xa0: # key is a small bytestring (up to 31 chars)
  934. pass
  935. elif d[offs] in (0xd9, 0xda, 0xdb): # key is a str8, str16 or str32
  936. pass
  937. else:
  938. # key is not a bytestring
  939. return False
  940. # is the bytestring any of the expected key names?
  941. key_serialized = d[offs:]
  942. return any(key_serialized.startswith(pattern) for pattern in keys_serialized)
  943. class RobustUnpacker:
  944. """A restartable/robust version of the streaming msgpack unpacker
  945. """
  946. class UnpackerCrashed(Exception):
  947. """raise if unpacker crashed"""
  948. def __init__(self, validator, item_keys):
  949. super().__init__()
  950. self.item_keys = [msgpack.packb(name.encode()) for name in item_keys]
  951. self.validator = validator
  952. self._buffered_data = []
  953. self._resync = False
  954. self._unpacker = msgpack.Unpacker(object_hook=StableDict)
  955. def resync(self):
  956. self._buffered_data = []
  957. self._resync = True
  958. def feed(self, data):
  959. if self._resync:
  960. self._buffered_data.append(data)
  961. else:
  962. self._unpacker.feed(data)
  963. def __iter__(self):
  964. return self
  965. def __next__(self):
  966. def unpack_next():
  967. try:
  968. return next(self._unpacker)
  969. except (TypeError, ValueError) as err:
  970. # transform exceptions that might be raised when feeding
  971. # msgpack with invalid data to a more specific exception
  972. raise self.UnpackerCrashed(str(err))
  973. if self._resync:
  974. data = b''.join(self._buffered_data)
  975. while self._resync:
  976. if not data:
  977. raise StopIteration
  978. # Abort early if the data does not look like a serialized item dict
  979. if not valid_msgpacked_dict(data, self.item_keys):
  980. data = data[1:]
  981. continue
  982. self._unpacker = msgpack.Unpacker(object_hook=StableDict)
  983. self._unpacker.feed(data)
  984. try:
  985. item = unpack_next()
  986. except (self.UnpackerCrashed, StopIteration):
  987. # as long as we are resyncing, we also ignore StopIteration
  988. pass
  989. else:
  990. if self.validator(item):
  991. self._resync = False
  992. return item
  993. data = data[1:]
  994. else:
  995. return unpack_next()
  996. class ArchiveChecker:
  997. def __init__(self):
  998. self.error_found = False
  999. self.possibly_superseded = set()
  1000. def check(self, repository, repair=False, archive=None, first=0, last=0, sort_by='', glob=None,
  1001. verify_data=False, save_space=False):
  1002. """Perform a set of checks on 'repository'
  1003. :param repair: enable repair mode, write updated or corrected data into repository
  1004. :param archive: only check this archive
  1005. :param first/last/sort_by: only check this number of first/last archives ordered by sort_by
  1006. :param glob: only check archives matching this glob
  1007. :param verify_data: integrity verification of data referenced by archives
  1008. :param save_space: Repository.commit(save_space)
  1009. """
  1010. logger.info('Starting archive consistency check...')
  1011. self.check_all = archive is None and not any((first, last, glob))
  1012. self.repair = repair
  1013. self.repository = repository
  1014. self.init_chunks()
  1015. if not self.chunks:
  1016. logger.error('Repository contains no apparent data at all, cannot continue check/repair.')
  1017. return False
  1018. self.key = self.identify_key(repository)
  1019. if verify_data:
  1020. self.verify_data()
  1021. if Manifest.MANIFEST_ID not in self.chunks:
  1022. logger.error("Repository manifest not found!")
  1023. self.error_found = True
  1024. self.manifest = self.rebuild_manifest()
  1025. else:
  1026. try:
  1027. self.manifest, _ = Manifest.load(repository, (Manifest.Operation.CHECK,), key=self.key)
  1028. except IntegrityError as exc:
  1029. logger.error('Repository manifest is corrupted: %s', exc)
  1030. self.error_found = True
  1031. del self.chunks[Manifest.MANIFEST_ID]
  1032. self.manifest = self.rebuild_manifest()
  1033. self.rebuild_refcounts(archive=archive, first=first, last=last, sort_by=sort_by, glob=glob)
  1034. self.orphan_chunks_check()
  1035. self.finish(save_space=save_space)
  1036. if self.error_found:
  1037. logger.error('Archive consistency check complete, problems found.')
  1038. else:
  1039. logger.info('Archive consistency check complete, no problems found.')
  1040. return self.repair or not self.error_found
  1041. def init_chunks(self):
  1042. """Fetch a list of all object keys from repository
  1043. """
  1044. # Explicitly set the initial hash table capacity to avoid performance issues
  1045. # due to hash table "resonance".
  1046. # Since reconstruction of archive items can add some new chunks, add 10 % headroom
  1047. capacity = int(len(self.repository) / ChunkIndex.MAX_LOAD_FACTOR * 1.1)
  1048. self.chunks = ChunkIndex(capacity)
  1049. marker = None
  1050. while True:
  1051. result = self.repository.list(limit=LIST_SCAN_LIMIT, marker=marker)
  1052. if not result:
  1053. break
  1054. marker = result[-1]
  1055. init_entry = ChunkIndexEntry(refcount=0, size=0, csize=0)
  1056. for id_ in result:
  1057. self.chunks[id_] = init_entry
  1058. def identify_key(self, repository):
  1059. try:
  1060. some_chunkid, _ = next(self.chunks.iteritems())
  1061. except StopIteration:
  1062. # repo is completely empty, no chunks
  1063. return None
  1064. cdata = repository.get(some_chunkid)
  1065. return key_factory(repository, cdata)
  1066. def verify_data(self):
  1067. logger.info('Starting cryptographic data integrity verification...')
  1068. chunks_count_index = len(self.chunks)
  1069. chunks_count_segments = 0
  1070. errors = 0
  1071. defect_chunks = []
  1072. pi = ProgressIndicatorPercent(total=chunks_count_index, msg="Verifying data %6.2f%%", step=0.01,
  1073. msgid='check.verify_data')
  1074. marker = None
  1075. while True:
  1076. chunk_ids = self.repository.scan(limit=100, marker=marker)
  1077. if not chunk_ids:
  1078. break
  1079. chunks_count_segments += len(chunk_ids)
  1080. marker = chunk_ids[-1]
  1081. chunk_data_iter = self.repository.get_many(chunk_ids)
  1082. chunk_ids_revd = list(reversed(chunk_ids))
  1083. while chunk_ids_revd:
  1084. pi.show()
  1085. chunk_id = chunk_ids_revd.pop(-1) # better efficiency
  1086. try:
  1087. encrypted_data = next(chunk_data_iter)
  1088. except (Repository.ObjectNotFound, IntegrityError) as err:
  1089. self.error_found = True
  1090. errors += 1
  1091. logger.error('chunk %s: %s', bin_to_hex(chunk_id), err)
  1092. if isinstance(err, IntegrityError):
  1093. defect_chunks.append(chunk_id)
  1094. # as the exception killed our generator, make a new one for remaining chunks:
  1095. if chunk_ids_revd:
  1096. chunk_ids = list(reversed(chunk_ids_revd))
  1097. chunk_data_iter = self.repository.get_many(chunk_ids)
  1098. else:
  1099. _chunk_id = None if chunk_id == Manifest.MANIFEST_ID else chunk_id
  1100. try:
  1101. self.key.decrypt(_chunk_id, encrypted_data)
  1102. except IntegrityError as integrity_error:
  1103. self.error_found = True
  1104. errors += 1
  1105. logger.error('chunk %s, integrity error: %s', bin_to_hex(chunk_id), integrity_error)
  1106. defect_chunks.append(chunk_id)
  1107. pi.finish()
  1108. if chunks_count_index != chunks_count_segments:
  1109. logger.error('Repo/Chunks index object count vs. segment files object count mismatch.')
  1110. logger.error('Repo/Chunks index: %d objects != segment files: %d objects',
  1111. chunks_count_index, chunks_count_segments)
  1112. if defect_chunks:
  1113. if self.repair:
  1114. # if we kill the defect chunk here, subsequent actions within this "borg check"
  1115. # run will find missing chunks and replace them with all-zero replacement
  1116. # chunks and flag the files as "repaired".
  1117. # if another backup is done later and the missing chunks get backupped again,
  1118. # a "borg check" afterwards can heal all files where this chunk was missing.
  1119. logger.warning('Found defect chunks. They will be deleted now, so affected files can '
  1120. 'get repaired now and maybe healed later.')
  1121. for defect_chunk in defect_chunks:
  1122. # remote repo (ssh): retry might help for strange network / NIC / RAM errors
  1123. # as the chunk will be retransmitted from remote server.
  1124. # local repo (fs): as chunks.iteritems loop usually pumps a lot of data through,
  1125. # a defect chunk is likely not in the fs cache any more and really gets re-read
  1126. # from the underlying media.
  1127. try:
  1128. encrypted_data = self.repository.get(defect_chunk)
  1129. _chunk_id = None if defect_chunk == Manifest.MANIFEST_ID else defect_chunk
  1130. self.key.decrypt(_chunk_id, encrypted_data)
  1131. except IntegrityError:
  1132. # failed twice -> get rid of this chunk
  1133. del self.chunks[defect_chunk]
  1134. self.repository.delete(defect_chunk)
  1135. logger.debug('chunk %s deleted.', bin_to_hex(defect_chunk))
  1136. else:
  1137. logger.warning('chunk %s not deleted, did not consistently fail.')
  1138. else:
  1139. logger.warning('Found defect chunks. With --repair, they would get deleted, so affected '
  1140. 'files could get repaired then and maybe healed later.')
  1141. for defect_chunk in defect_chunks:
  1142. logger.debug('chunk %s is defect.', bin_to_hex(defect_chunk))
  1143. log = logger.error if errors else logger.info
  1144. log('Finished cryptographic data integrity verification, verified %d chunks with %d integrity errors.',
  1145. chunks_count_segments, errors)
  1146. def rebuild_manifest(self):
  1147. """Rebuild the manifest object if it is missing
  1148. Iterates through all objects in the repository looking for archive metadata blocks.
  1149. """
  1150. required_archive_keys = frozenset(key.encode() for key in REQUIRED_ARCHIVE_KEYS)
  1151. def valid_archive(obj):
  1152. if not isinstance(obj, dict):
  1153. return False
  1154. keys = set(obj)
  1155. return required_archive_keys.issubset(keys)
  1156. logger.info('Rebuilding missing manifest, this might take some time...')
  1157. # as we have lost the manifest, we do not know any more what valid item keys we had.
  1158. # collecting any key we encounter in a damaged repo seems unwise, thus we just use
  1159. # the hardcoded list from the source code. thus, it is not recommended to rebuild a
  1160. # lost manifest on a older borg version than the most recent one that was ever used
  1161. # within this repository (assuming that newer borg versions support more item keys).
  1162. manifest = Manifest(self.key, self.repository)
  1163. archive_keys_serialized = [msgpack.packb(name.encode()) for name in ARCHIVE_KEYS]
  1164. for chunk_id, _ in self.chunks.iteritems():
  1165. cdata = self.repository.get(chunk_id)
  1166. try:
  1167. data = self.key.decrypt(chunk_id, cdata)
  1168. except IntegrityError as exc:
  1169. logger.error('Skipping corrupted chunk: %s', exc)
  1170. self.error_found = True
  1171. continue
  1172. if not valid_msgpacked_dict(data, archive_keys_serialized):
  1173. continue
  1174. if b'cmdline' not in data or b'\xa7version\x01' not in data:
  1175. continue
  1176. try:
  1177. archive = msgpack.unpackb(data)
  1178. # Ignore exceptions that might be raised when feeding
  1179. # msgpack with invalid data
  1180. except (TypeError, ValueError, StopIteration):
  1181. continue
  1182. if valid_archive(archive):
  1183. archive = ArchiveItem(internal_dict=archive)
  1184. name = archive.name
  1185. logger.info('Found archive %s', name)
  1186. if name in manifest.archives:
  1187. i = 1
  1188. while True:
  1189. new_name = '%s.%d' % (name, i)
  1190. if new_name not in manifest.archives:
  1191. break
  1192. i += 1
  1193. logger.warning('Duplicate archive name %s, storing as %s', name, new_name)
  1194. name = new_name
  1195. manifest.archives[name] = (chunk_id, archive.time)
  1196. logger.info('Manifest rebuild complete.')
  1197. return manifest
  1198. def rebuild_refcounts(self, archive=None, first=0, last=0, sort_by='', glob=None):
  1199. """Rebuild object reference counts by walking the metadata
  1200. Missing and/or incorrect data is repaired when detected
  1201. """
  1202. # Exclude the manifest from chunks (manifest entry might be already deleted from self.chunks)
  1203. self.chunks.pop(Manifest.MANIFEST_ID, None)
  1204. def mark_as_possibly_superseded(id_):
  1205. if self.chunks.get(id_, ChunkIndexEntry(0, 0, 0)).refcount == 0:
  1206. self.possibly_superseded.add(id_)
  1207. def add_callback(chunk):
  1208. id_ = self.key.id_hash(chunk)
  1209. cdata = self.key.encrypt(chunk)
  1210. add_reference(id_, len(chunk), len(cdata), cdata)
  1211. return id_
  1212. def add_reference(id_, size, csize, cdata=None):
  1213. try:
  1214. self.chunks.incref(id_)
  1215. except KeyError:
  1216. assert cdata is not None
  1217. self.chunks[id_] = ChunkIndexEntry(refcount=1, size=size, csize=csize)
  1218. if self.repair:
  1219. self.repository.put(id_, cdata)
  1220. def verify_file_chunks(item):
  1221. """Verifies that all file chunks are present.
  1222. Missing file chunks will be replaced with new chunks of the same length containing all zeros.
  1223. If a previously missing file chunk re-appears, the replacement chunk is replaced by the correct one.
  1224. """
  1225. def replacement_chunk(size):
  1226. data = bytes(size)
  1227. chunk_id = self.key.id_hash(data)
  1228. cdata = self.key.encrypt(data)
  1229. csize = len(cdata)
  1230. return chunk_id, size, csize, cdata
  1231. offset = 0
  1232. chunk_list = []
  1233. chunks_replaced = False
  1234. has_chunks_healthy = 'chunks_healthy' in item
  1235. chunks_current = item.chunks
  1236. chunks_healthy = item.chunks_healthy if has_chunks_healthy else chunks_current
  1237. assert len(chunks_current) == len(chunks_healthy)
  1238. for chunk_current, chunk_healthy in zip(chunks_current, chunks_healthy):
  1239. chunk_id, size, csize = chunk_healthy
  1240. if chunk_id not in self.chunks:
  1241. # a chunk of the healthy list is missing
  1242. if chunk_current == chunk_healthy:
  1243. logger.error('{}: New missing file chunk detected (Byte {}-{}). '
  1244. 'Replacing with all-zero chunk.'.format(item.path, offset, offset + size))
  1245. self.error_found = chunks_replaced = True
  1246. chunk_id, size, csize, cdata = replacement_chunk(size)
  1247. add_reference(chunk_id, size, csize, cdata)
  1248. else:
  1249. logger.info('{}: Previously missing file chunk is still missing (Byte {}-{}). It has a '
  1250. 'all-zero replacement chunk already.'.format(item.path, offset, offset + size))
  1251. chunk_id, size, csize = chunk_current
  1252. if chunk_id in self.chunks:
  1253. add_reference(chunk_id, size, csize)
  1254. else:
  1255. logger.warning('{}: Missing all-zero replacement chunk detected (Byte {}-{}). '
  1256. 'Generating new replacement chunk.'.format(item.path, offset, offset + size))
  1257. self.error_found = chunks_replaced = True
  1258. chunk_id, size, csize, cdata = replacement_chunk(size)
  1259. add_reference(chunk_id, size, csize, cdata)
  1260. else:
  1261. if chunk_current == chunk_healthy:
  1262. # normal case, all fine.
  1263. add_reference(chunk_id, size, csize)
  1264. else:
  1265. logger.info('{}: Healed previously missing file chunk! '
  1266. '(Byte {}-{}).'.format(item.path, offset, offset + size))
  1267. add_reference(chunk_id, size, csize)
  1268. mark_as_possibly_superseded(chunk_current[0]) # maybe orphaned the all-zero replacement chunk
  1269. chunk_list.append([chunk_id, size, csize]) # list-typed element as chunks_healthy is list-of-lists
  1270. offset += size
  1271. if chunks_replaced and not has_chunks_healthy:
  1272. # if this is first repair, remember the correct chunk IDs, so we can maybe heal the file later
  1273. item.chunks_healthy = item.chunks
  1274. if has_chunks_healthy and chunk_list == chunks_healthy:
  1275. logger.info('{}: Completely healed previously damaged file!'.format(item.path))
  1276. del item.chunks_healthy
  1277. item.chunks = chunk_list
  1278. if 'size' in item:
  1279. item_size = item.size
  1280. item_chunks_size = item.get_size(compressed=False, from_chunks=True)
  1281. if item_size != item_chunks_size:
  1282. # just warn, but keep the inconsistency, so that borg extract can warn about it.
  1283. logger.warning('{}: size inconsistency detected: size {}, chunks size {}'.format(
  1284. item.path, item_size, item_chunks_size))
  1285. def robust_iterator(archive):
  1286. """Iterates through all archive items
  1287. Missing item chunks will be skipped and the msgpack stream will be restarted
  1288. """
  1289. item_keys = frozenset(key.encode() for key in self.manifest.item_keys)
  1290. required_item_keys = frozenset(key.encode() for key in REQUIRED_ITEM_KEYS)
  1291. unpacker = RobustUnpacker(lambda item: isinstance(item, dict) and 'path' in item,
  1292. self.manifest.item_keys)
  1293. _state = 0
  1294. def missing_chunk_detector(chunk_id):
  1295. nonlocal _state
  1296. if _state % 2 != int(chunk_id not in self.chunks):
  1297. _state += 1
  1298. return _state
  1299. def report(msg, chunk_id, chunk_no):
  1300. cid = bin_to_hex(chunk_id)
  1301. msg += ' [chunk: %06d_%s]' % (chunk_no, cid) # see "debug dump-archive-items"
  1302. self.error_found = True
  1303. logger.error(msg)
  1304. def list_keys_safe(keys):
  1305. return ', '.join((k.decode() if isinstance(k, bytes) else str(k) for k in keys))
  1306. def valid_item(obj):
  1307. if not isinstance(obj, StableDict):
  1308. return False, 'not a dictionary'
  1309. # A bug in Attic up to and including release 0.13 added a (meaningless) b'acl' key to every item.
  1310. # We ignore it here, should it exist. See test_attic013_acl_bug for details.
  1311. obj.pop(b'acl', None)
  1312. keys = set(obj)
  1313. if not required_item_keys.issubset(keys):
  1314. return False, 'missing required keys: ' + list_keys_safe(required_item_keys - keys)
  1315. if not keys.issubset(item_keys):
  1316. return False, 'invalid keys: ' + list_keys_safe(keys - item_keys)
  1317. return True, ''
  1318. i = 0
  1319. for state, items in groupby(archive.items, missing_chunk_detector):
  1320. items = list(items)
  1321. if state % 2:
  1322. for chunk_id in items:
  1323. report('item metadata chunk missing', chunk_id, i)
  1324. i += 1
  1325. continue
  1326. if state > 0:
  1327. unpacker.resync()
  1328. for chunk_id, cdata in zip(items, repository.get_many(items)):
  1329. data = self.key.decrypt(chunk_id, cdata)
  1330. unpacker.feed(data)
  1331. try:
  1332. for item in unpacker:
  1333. valid, reason = valid_item(item)
  1334. if valid:
  1335. yield Item(internal_dict=item)
  1336. else:
  1337. report('Did not get expected metadata dict when unpacking item metadata (%s)' % reason, chunk_id, i)
  1338. except RobustUnpacker.UnpackerCrashed as err:
  1339. report('Unpacker crashed while unpacking item metadata, trying to resync...', chunk_id, i)
  1340. unpacker.resync()
  1341. except Exception:
  1342. report('Exception while unpacking item metadata', chunk_id, i)
  1343. raise
  1344. i += 1
  1345. if archive is None:
  1346. sort_by = sort_by.split(',')
  1347. if any((first, last, glob)):
  1348. archive_infos = self.manifest.archives.list(sort_by=sort_by, glob=glob, first=first, last=last)
  1349. if glob and not archive_infos:
  1350. logger.warning('--glob-archives %s does not match any archives', glob)
  1351. if first and len(archive_infos) < first:
  1352. logger.warning('--first %d archives: only found %d archives', first, len(archive_infos))
  1353. if last and len(archive_infos) < last:
  1354. logger.warning('--last %d archives: only found %d archives', last, len(archive_infos))
  1355. else:
  1356. archive_infos = self.manifest.archives.list(sort_by=sort_by)
  1357. else:
  1358. # we only want one specific archive
  1359. try:
  1360. archive_infos = [self.manifest.archives[archive]]
  1361. except KeyError:
  1362. logger.error("Archive '%s' not found.", archive)
  1363. self.error_found = True
  1364. return
  1365. num_archives = len(archive_infos)
  1366. with cache_if_remote(self.repository) as repository:
  1367. for i, info in enumerate(archive_infos):
  1368. logger.info('Analyzing archive {} ({}/{})'.format(info.name, i + 1, num_archives))
  1369. archive_id = info.id
  1370. if archive_id not in self.chunks:
  1371. logger.error('Archive metadata block is missing!')
  1372. self.error_found = True
  1373. del self.manifest.archives[info.name]
  1374. continue
  1375. mark_as_possibly_superseded(archive_id)
  1376. cdata = self.repository.get(archive_id)
  1377. data = self.key.decrypt(archive_id, cdata)
  1378. archive = ArchiveItem(internal_dict=msgpack.unpackb(data))
  1379. if archive.version != 1:
  1380. raise Exception('Unknown archive metadata version')
  1381. archive.cmdline = [safe_decode(arg) for arg in archive.cmdline]
  1382. items_buffer = ChunkBuffer(self.key)
  1383. items_buffer.write_chunk = add_callback
  1384. for item in robust_iterator(archive):
  1385. if 'chunks' in item:
  1386. verify_file_chunks(item)
  1387. items_buffer.add(item)
  1388. items_buffer.flush(flush=True)
  1389. for previous_item_id in archive.items:
  1390. mark_as_possibly_superseded(previous_item_id)
  1391. archive.items = items_buffer.chunks
  1392. data = msgpack.packb(archive.as_dict(), unicode_errors='surrogateescape')
  1393. new_archive_id = self.key.id_hash(data)
  1394. cdata = self.key.encrypt(data)
  1395. add_reference(new_archive_id, len(data), len(cdata), cdata)
  1396. self.manifest.archives[info.name] = (new_archive_id, info.ts)
  1397. def orphan_chunks_check(self):
  1398. if self.check_all:
  1399. unused = {id_ for id_, entry in self.chunks.iteritems() if entry.refcount == 0}
  1400. orphaned = unused - self.possibly_superseded
  1401. if orphaned:
  1402. logger.error('{} orphaned objects found!'.format(len(orphaned)))
  1403. self.error_found = True
  1404. if self.repair:
  1405. for id_ in unused:
  1406. self.repository.delete(id_)
  1407. else:
  1408. logger.info('Orphaned objects check skipped (needs all archives checked).')
  1409. def finish(self, save_space=False):
  1410. if self.repair:
  1411. self.manifest.write()
  1412. self.repository.commit(save_space=save_space)
  1413. class ArchiveRecreater:
  1414. class Interrupted(Exception):
  1415. def __init__(self, metadata=None):
  1416. self.metadata = metadata or {}
  1417. @staticmethod
  1418. def is_temporary_archive(archive_name):
  1419. return archive_name.endswith('.recreate')
  1420. def __init__(self, repository, manifest, key, cache, matcher,
  1421. exclude_caches=False, exclude_if_present=None, keep_exclude_tags=False,
  1422. chunker_params=None, compression=None, recompress=False, always_recompress=False,
  1423. dry_run=False, stats=False, progress=False, file_status_printer=None,
  1424. checkpoint_interval=1800):
  1425. self.repository = repository
  1426. self.key = key
  1427. self.manifest = manifest
  1428. self.cache = cache
  1429. self.matcher = matcher
  1430. self.exclude_caches = exclude_caches
  1431. self.exclude_if_present = exclude_if_present or []
  1432. self.keep_exclude_tags = keep_exclude_tags
  1433. self.rechunkify = chunker_params is not None
  1434. if self.rechunkify:
  1435. logger.debug('Rechunking archives to %s', chunker_params)
  1436. self.chunker_params = chunker_params or CHUNKER_PARAMS
  1437. self.recompress = recompress
  1438. self.always_recompress = always_recompress
  1439. self.compression = compression or CompressionSpec('none')
  1440. self.seen_chunks = set()
  1441. self.dry_run = dry_run
  1442. self.stats = stats
  1443. self.progress = progress
  1444. self.print_file_status = file_status_printer or (lambda *args: None)
  1445. self.checkpoint_interval = None if dry_run else checkpoint_interval
  1446. def recreate(self, archive_name, comment=None, target_name=None):
  1447. assert not self.is_temporary_archive(archive_name)
  1448. archive = self.open_archive(archive_name)
  1449. target = self.create_target(archive, target_name)
  1450. if self.exclude_if_present or self.exclude_caches:
  1451. self.matcher_add_tagged_dirs(archive)
  1452. if self.matcher.empty() and not self.recompress and not target.recreate_rechunkify and comment is None:
  1453. return False
  1454. self.process_items(archive, target)
  1455. replace_original = target_name is None
  1456. self.save(archive, target, comment, replace_original=replace_original)
  1457. return True
  1458. def process_items(self, archive, target):
  1459. matcher = self.matcher
  1460. target_is_subset = not matcher.empty()
  1461. hardlink_masters = {} if target_is_subset else None
  1462. def item_is_hardlink_master(item):
  1463. return (target_is_subset and
  1464. hardlinkable(item.mode) and
  1465. item.get('hardlink_master', True) and
  1466. 'source' not in item)
  1467. for item in archive.iter_items():
  1468. if not matcher.match(item.path):
  1469. self.print_file_status('x', item.path)
  1470. if item_is_hardlink_master(item):
  1471. hardlink_masters[item.path] = (item.get('chunks'), None)
  1472. continue
  1473. if target_is_subset and hardlinkable(item.mode) and item.get('source') in hardlink_masters:
  1474. # master of this hard link is outside the target subset
  1475. chunks, new_source = hardlink_masters[item.source]
  1476. if new_source is None:
  1477. # First item to use this master, move the chunks
  1478. item.chunks = chunks
  1479. hardlink_masters[item.source] = (None, item.path)
  1480. del item.source
  1481. else:
  1482. # Master was already moved, only update this item's source
  1483. item.source = new_source
  1484. if self.dry_run:
  1485. self.print_file_status('-', item.path)
  1486. else:
  1487. self.process_item(archive, target, item)
  1488. if self.progress:
  1489. target.stats.show_progress(final=True)
  1490. def process_item(self, archive, target, item):
  1491. if 'chunks' in item:
  1492. self.process_chunks(archive, target, item)
  1493. target.stats.nfiles += 1
  1494. target.add_item(item)
  1495. self.print_file_status(file_status(item.mode), item.path)
  1496. def process_chunks(self, archive, target, item):
  1497. if not self.recompress and not target.recreate_rechunkify:
  1498. for chunk_id, size, csize in item.chunks:
  1499. self.cache.chunk_incref(chunk_id, target.stats)
  1500. return item.chunks
  1501. chunk_iterator = self.iter_chunks(archive, target, list(item.chunks))
  1502. chunk_processor = partial(self.chunk_processor, target)
  1503. target.chunk_file(item, self.cache, target.stats, chunk_iterator, chunk_processor)
  1504. def chunk_processor(self, target, data):
  1505. chunk_id = self.key.id_hash(data)
  1506. if chunk_id in self.seen_chunks:
  1507. return self.cache.chunk_incref(chunk_id, target.stats)
  1508. overwrite = self.recompress
  1509. if self.recompress and not self.always_recompress and chunk_id in self.cache.chunks:
  1510. # Check if this chunk is already compressed the way we want it
  1511. old_chunk = self.key.decrypt(None, self.repository.get(chunk_id), decompress=False)
  1512. if Compressor.detect(old_chunk).name == self.key.compressor.decide(data).name:
  1513. # Stored chunk has the same compression we wanted
  1514. overwrite = False
  1515. chunk_entry = self.cache.add_chunk(chunk_id, data, target.stats, overwrite=overwrite, wait=False)
  1516. self.cache.repository.async_response(wait=False)
  1517. self.seen_chunks.add(chunk_entry.id)
  1518. return chunk_entry
  1519. def iter_chunks(self, archive, target, chunks):
  1520. chunk_iterator = archive.pipeline.fetch_many([chunk_id for chunk_id, _, _ in chunks])
  1521. if target.recreate_rechunkify:
  1522. # The target.chunker will read the file contents through ChunkIteratorFileWrapper chunk-by-chunk
  1523. # (does not load the entire file into memory)
  1524. file = ChunkIteratorFileWrapper(chunk_iterator)
  1525. yield from target.chunker.chunkify(file)
  1526. else:
  1527. for chunk in chunk_iterator:
  1528. yield chunk
  1529. def save(self, archive, target, comment=None, replace_original=True):
  1530. if self.dry_run:
  1531. return
  1532. if comment is None:
  1533. comment = archive.metadata.get('comment', '')
  1534. target.save(comment=comment, additional_metadata={
  1535. # keep some metadata as in original archive:
  1536. 'time': archive.metadata.time,
  1537. 'time_end': archive.metadata.time_end,
  1538. 'cmdline': archive.metadata.cmdline,
  1539. # but also remember recreate metadata:
  1540. 'recreate_cmdline': sys.argv,
  1541. })
  1542. if replace_original:
  1543. archive.delete(Statistics(), progress=self.progress)
  1544. target.rename(archive.name)
  1545. if self.stats:
  1546. target.end = datetime.utcnow()
  1547. log_multi(DASHES,
  1548. str(target),
  1549. DASHES,
  1550. str(target.stats),
  1551. str(self.cache),
  1552. DASHES)
  1553. def matcher_add_tagged_dirs(self, archive):
  1554. """Add excludes to the matcher created by exclude_cache and exclude_if_present."""
  1555. def exclude(dir, tag_item):
  1556. if self.keep_exclude_tags:
  1557. tag_files.append(PathPrefixPattern(tag_item.path, recurse_dir=False))
  1558. tagged_dirs.append(FnmatchPattern(dir + '/', recurse_dir=False))
  1559. else:
  1560. tagged_dirs.append(PathPrefixPattern(dir, recurse_dir=False))
  1561. matcher = self.matcher
  1562. tag_files = []
  1563. tagged_dirs = []
  1564. # build hardlink masters, but only for paths ending in CACHE_TAG_NAME, so we can read hard-linked TAGs
  1565. cachedir_masters = {}
  1566. for item in archive.iter_items(
  1567. filter=lambda item: item.path.endswith(CACHE_TAG_NAME) or matcher.match(item.path)):
  1568. if item.path.endswith(CACHE_TAG_NAME):
  1569. cachedir_masters[item.path] = item
  1570. dir, tag_file = os.path.split(item.path)
  1571. if tag_file in self.exclude_if_present:
  1572. exclude(dir, item)
  1573. if stat.S_ISREG(item.mode):
  1574. if self.exclude_caches and tag_file == CACHE_TAG_NAME:
  1575. if 'chunks' in item:
  1576. file = open_item(archive, item)
  1577. else:
  1578. file = open_item(archive, cachedir_masters[item.source])
  1579. if file.read(len(CACHE_TAG_CONTENTS)).startswith(CACHE_TAG_CONTENTS):
  1580. exclude(dir, item)
  1581. matcher.add(tag_files, IECommand.Include)
  1582. matcher.add(tagged_dirs, IECommand.ExcludeNoRecurse)
  1583. def create_target(self, archive, target_name=None):
  1584. """Create target archive."""
  1585. target_name = target_name or archive.name + '.recreate'
  1586. target = self.create_target_archive(target_name)
  1587. # If the archives use the same chunker params, then don't rechunkify
  1588. source_chunker_params = tuple(archive.metadata.get('chunker_params', []))
  1589. target.recreate_rechunkify = self.rechunkify and source_chunker_params != target.chunker_params
  1590. if target.recreate_rechunkify:
  1591. logger.debug('Rechunking archive from %s to %s', source_chunker_params or '(unknown)', target.chunker_params)
  1592. return target
  1593. def create_target_archive(self, name):
  1594. target = Archive(self.repository, self.key, self.manifest, name, create=True,
  1595. progress=self.progress, chunker_params=self.chunker_params, cache=self.cache,
  1596. checkpoint_interval=self.checkpoint_interval)
  1597. return target
  1598. def open_archive(self, name, **kwargs):
  1599. return Archive(self.repository, self.key, self.manifest, name, cache=self.cache, **kwargs)