archive.py 74 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732
  1. import errno
  2. import json
  3. import os
  4. import socket
  5. import stat
  6. import sys
  7. import time
  8. from contextlib import contextmanager
  9. from datetime import datetime, timezone, timedelta
  10. from functools import partial
  11. from getpass import getuser
  12. from io import BytesIO
  13. from itertools import groupby
  14. from shutil import get_terminal_size
  15. import msgpack
  16. from .logger import create_logger
  17. logger = create_logger()
  18. from . import xattr
  19. from .cache import ChunkListEntry
  20. from .chunker import Chunker
  21. from .compress import Compressor
  22. from .constants import * # NOQA
  23. from .hashindex import ChunkIndex, ChunkIndexEntry
  24. from .helpers import Manifest
  25. from .helpers import Chunk, ChunkIteratorFileWrapper, open_item
  26. from .helpers import Error, IntegrityError
  27. from .helpers import uid2user, user2uid, gid2group, group2gid
  28. from .helpers import parse_timestamp, to_localtime
  29. from .helpers import format_time, format_timedelta, format_file_size, file_status, FileSize
  30. from .helpers import safe_encode, safe_decode, make_path_safe, remove_surrogates
  31. from .helpers import StableDict
  32. from .helpers import bin_to_hex
  33. from .helpers import ellipsis_truncate, ProgressIndicatorPercent, log_multi
  34. from .helpers import PathPrefixPattern, FnmatchPattern
  35. from .helpers import CompressionDecider1, CompressionDecider2, CompressionSpec
  36. from .item import Item, ArchiveItem
  37. from .key import key_factory
  38. from .platform import acl_get, acl_set, set_flags, get_flags, swidth
  39. from .remote import cache_if_remote
  40. from .repository import Repository, LIST_SCAN_LIMIT
  41. has_lchmod = hasattr(os, 'lchmod')
  42. flags_normal = os.O_RDONLY | getattr(os, 'O_BINARY', 0)
  43. flags_noatime = flags_normal | getattr(os, 'O_NOATIME', 0)
  44. class Statistics:
  45. def __init__(self, output_json=False):
  46. self.output_json = output_json
  47. self.osize = self.csize = self.usize = self.nfiles = 0
  48. self.last_progress = 0 # timestamp when last progress was shown
  49. def update(self, size, csize, unique):
  50. self.osize += size
  51. self.csize += csize
  52. if unique:
  53. self.usize += csize
  54. summary = "{label:15} {stats.osize_fmt:>20s} {stats.csize_fmt:>20s} {stats.usize_fmt:>20s}"
  55. def __str__(self):
  56. return self.summary.format(stats=self, label='This archive:')
  57. def __repr__(self):
  58. return "<{cls} object at {hash:#x} ({self.osize}, {self.csize}, {self.usize})>".format(
  59. cls=type(self).__name__, hash=id(self), self=self)
  60. def as_dict(self):
  61. return {
  62. 'original_size': FileSize(self.osize),
  63. 'compressed_size': FileSize(self.csize),
  64. 'deduplicated_size': FileSize(self.usize),
  65. 'nfiles': self.nfiles,
  66. }
  67. @property
  68. def osize_fmt(self):
  69. return format_file_size(self.osize)
  70. @property
  71. def usize_fmt(self):
  72. return format_file_size(self.usize)
  73. @property
  74. def csize_fmt(self):
  75. return format_file_size(self.csize)
  76. def show_progress(self, item=None, final=False, stream=None, dt=None):
  77. now = time.monotonic()
  78. if dt is None or now - self.last_progress > dt:
  79. self.last_progress = now
  80. if self.output_json:
  81. data = self.as_dict()
  82. data.update({
  83. 'type': 'archive_progress',
  84. 'path': remove_surrogates(item.path if item else ''),
  85. })
  86. msg = json.dumps(data)
  87. end = '\n'
  88. else:
  89. columns, lines = get_terminal_size()
  90. if not final:
  91. msg = '{0.osize_fmt} O {0.csize_fmt} C {0.usize_fmt} D {0.nfiles} N '.format(self)
  92. path = remove_surrogates(item.path) if item else ''
  93. space = columns - swidth(msg)
  94. if space < 12:
  95. msg = ''
  96. space = columns - swidth(msg)
  97. if space >= 8:
  98. msg += ellipsis_truncate(path, space)
  99. else:
  100. msg = ' ' * columns
  101. end = '\r'
  102. print(msg, end=end, file=stream or sys.stderr, flush=True)
  103. def is_special(mode):
  104. # file types that get special treatment in --read-special mode
  105. return stat.S_ISBLK(mode) or stat.S_ISCHR(mode) or stat.S_ISFIFO(mode)
  106. class BackupOSError(Exception):
  107. """
  108. Wrapper for OSError raised while accessing backup files.
  109. Borg does different kinds of IO, and IO failures have different consequences.
  110. This wrapper represents failures of input file or extraction IO.
  111. These are non-critical and are only reported (exit code = 1, warning).
  112. Any unwrapped IO error is critical and aborts execution (for example repository IO failure).
  113. """
  114. def __init__(self, op, os_error):
  115. self.op = op
  116. self.os_error = os_error
  117. self.errno = os_error.errno
  118. self.strerror = os_error.strerror
  119. self.filename = os_error.filename
  120. def __str__(self):
  121. if self.op:
  122. return '%s: %s' % (self.op, self.os_error)
  123. else:
  124. return str(self.os_error)
  125. class BackupIO:
  126. op = ''
  127. def __call__(self, op=''):
  128. self.op = op
  129. return self
  130. def __enter__(self):
  131. pass
  132. def __exit__(self, exc_type, exc_val, exc_tb):
  133. if exc_type and issubclass(exc_type, OSError):
  134. raise BackupOSError(self.op, exc_val) from exc_val
  135. backup_io = BackupIO()
  136. def backup_io_iter(iterator):
  137. backup_io.op = 'read'
  138. while True:
  139. try:
  140. with backup_io:
  141. item = next(iterator)
  142. except StopIteration:
  143. return
  144. yield item
  145. class DownloadPipeline:
  146. def __init__(self, repository, key):
  147. self.repository = repository
  148. self.key = key
  149. def unpack_many(self, ids, filter=None, preload=False):
  150. """
  151. Return iterator of items.
  152. *ids* is a chunk ID list of an item stream. *filter* is a callable
  153. to decide whether an item will be yielded. *preload* preloads the data chunks of every yielded item.
  154. Warning: if *preload* is True then all data chunks of every yielded item have to be retrieved,
  155. otherwise preloaded chunks will accumulate in RemoteRepository and create a memory leak.
  156. """
  157. unpacker = msgpack.Unpacker(use_list=False)
  158. for _, data in self.fetch_many(ids):
  159. unpacker.feed(data)
  160. items = [Item(internal_dict=item) for item in unpacker]
  161. for item in items:
  162. if 'chunks' in item:
  163. item.chunks = [ChunkListEntry(*e) for e in item.chunks]
  164. if filter:
  165. items = [item for item in items if filter(item)]
  166. if preload:
  167. for item in items:
  168. if 'chunks' in item:
  169. self.repository.preload([c.id for c in item.chunks])
  170. for item in items:
  171. yield item
  172. def fetch_many(self, ids, is_preloaded=False):
  173. for id_, data in zip(ids, self.repository.get_many(ids, is_preloaded=is_preloaded)):
  174. yield self.key.decrypt(id_, data)
  175. class ChunkBuffer:
  176. BUFFER_SIZE = 8 * 1024 * 1024
  177. def __init__(self, key, chunker_params=ITEMS_CHUNKER_PARAMS):
  178. self.buffer = BytesIO()
  179. self.packer = msgpack.Packer(unicode_errors='surrogateescape')
  180. self.chunks = []
  181. self.key = key
  182. self.chunker = Chunker(self.key.chunk_seed, *chunker_params)
  183. def add(self, item):
  184. self.buffer.write(self.packer.pack(item.as_dict()))
  185. if self.is_full():
  186. self.flush()
  187. def write_chunk(self, chunk):
  188. raise NotImplementedError
  189. def flush(self, flush=False):
  190. if self.buffer.tell() == 0:
  191. return
  192. self.buffer.seek(0)
  193. chunks = list(Chunk(bytes(s)) for s in self.chunker.chunkify(self.buffer))
  194. self.buffer.seek(0)
  195. self.buffer.truncate(0)
  196. # Leave the last partial chunk in the buffer unless flush is True
  197. end = None if flush or len(chunks) == 1 else -1
  198. for chunk in chunks[:end]:
  199. self.chunks.append(self.write_chunk(chunk))
  200. if end == -1:
  201. self.buffer.write(chunks[-1].data)
  202. def is_full(self):
  203. return self.buffer.tell() > self.BUFFER_SIZE
  204. class CacheChunkBuffer(ChunkBuffer):
  205. def __init__(self, cache, key, stats, chunker_params=ITEMS_CHUNKER_PARAMS):
  206. super().__init__(key, chunker_params)
  207. self.cache = cache
  208. self.stats = stats
  209. def write_chunk(self, chunk):
  210. id_, _, _ = self.cache.add_chunk(self.key.id_hash(chunk.data), chunk, self.stats)
  211. return id_
  212. class Archive:
  213. class DoesNotExist(Error):
  214. """Archive {} does not exist"""
  215. class AlreadyExists(Error):
  216. """Archive {} already exists"""
  217. class IncompatibleFilesystemEncodingError(Error):
  218. """Failed to encode filename "{}" into file system encoding "{}". Consider configuring the LANG environment variable."""
  219. def __init__(self, repository, key, manifest, name, cache=None, create=False,
  220. checkpoint_interval=300, numeric_owner=False, noatime=False, noctime=False, progress=False,
  221. chunker_params=CHUNKER_PARAMS, start=None, start_monotonic=None, end=None, compression=None, compression_files=None,
  222. consider_part_files=False, log_json=False):
  223. self.cwd = os.getcwd()
  224. self.key = key
  225. self.repository = repository
  226. self.cache = cache
  227. self.manifest = manifest
  228. self.hard_links = {}
  229. self.stats = Statistics(output_json=log_json)
  230. self.show_progress = progress
  231. self.name = name
  232. self.checkpoint_interval = checkpoint_interval
  233. self.numeric_owner = numeric_owner
  234. self.noatime = noatime
  235. self.noctime = noctime
  236. assert (start is None) == (start_monotonic is None), 'Logic error: if start is given, start_monotonic must be given as well and vice versa.'
  237. if start is None:
  238. start = datetime.utcnow()
  239. start_monotonic = time.monotonic()
  240. self.chunker_params = chunker_params
  241. self.start = start
  242. self.start_monotonic = start_monotonic
  243. if end is None:
  244. end = datetime.utcnow()
  245. self.end = end
  246. self.consider_part_files = consider_part_files
  247. self.pipeline = DownloadPipeline(self.repository, self.key)
  248. self.create = create
  249. if self.create:
  250. self.file_compression_logger = create_logger('borg.debug.file-compression')
  251. self.items_buffer = CacheChunkBuffer(self.cache, self.key, self.stats)
  252. self.chunker = Chunker(self.key.chunk_seed, *chunker_params)
  253. self.compression_decider1 = CompressionDecider1(compression or CompressionSpec('none'),
  254. compression_files or [])
  255. key.compression_decider2 = CompressionDecider2(compression or CompressionSpec('none'))
  256. if name in manifest.archives:
  257. raise self.AlreadyExists(name)
  258. self.last_checkpoint = time.monotonic()
  259. i = 0
  260. while True:
  261. self.checkpoint_name = '%s.checkpoint%s' % (name, i and ('.%d' % i) or '')
  262. if self.checkpoint_name not in manifest.archives:
  263. break
  264. i += 1
  265. else:
  266. info = self.manifest.archives.get(name)
  267. if info is None:
  268. raise self.DoesNotExist(name)
  269. self.load(info.id)
  270. self.zeros = b'\0' * (1 << chunker_params[1])
  271. def _load_meta(self, id):
  272. _, data = self.key.decrypt(id, self.repository.get(id))
  273. metadata = ArchiveItem(internal_dict=msgpack.unpackb(data, unicode_errors='surrogateescape'))
  274. if metadata.version != 1:
  275. raise Exception('Unknown archive metadata version')
  276. return metadata
  277. def load(self, id):
  278. self.id = id
  279. self.metadata = self._load_meta(self.id)
  280. self.metadata.cmdline = [safe_decode(arg) for arg in self.metadata.cmdline]
  281. self.name = self.metadata.name
  282. @property
  283. def ts(self):
  284. """Timestamp of archive creation (start) in UTC"""
  285. ts = self.metadata.time
  286. return parse_timestamp(ts)
  287. @property
  288. def ts_end(self):
  289. """Timestamp of archive creation (end) in UTC"""
  290. # fall back to time if there is no time_end present in metadata
  291. ts = self.metadata.get('time_end') or self.metadata.time
  292. return parse_timestamp(ts)
  293. @property
  294. def fpr(self):
  295. return bin_to_hex(self.id)
  296. @property
  297. def duration(self):
  298. return format_timedelta(self.end - self.start)
  299. @property
  300. def duration_from_meta(self):
  301. return format_timedelta(self.ts_end - self.ts)
  302. def info(self):
  303. if self.create:
  304. stats = self.stats
  305. start = self.start.replace(tzinfo=timezone.utc)
  306. end = self.end.replace(tzinfo=timezone.utc)
  307. else:
  308. stats = self.calc_stats(self.cache)
  309. start = self.ts
  310. end = self.ts_end
  311. info = {
  312. 'name': self.name,
  313. 'id': self.fpr,
  314. 'start': format_time(to_localtime(start)),
  315. 'end': format_time(to_localtime(end)),
  316. 'duration': (end - start).total_seconds(),
  317. 'stats': stats.as_dict(),
  318. 'limits': {
  319. 'max_archive_size': self.cache.chunks[self.id].csize / MAX_DATA_SIZE,
  320. },
  321. }
  322. if self.create:
  323. info['command_line'] = sys.argv
  324. else:
  325. info.update({
  326. 'command_line': self.metadata.cmdline,
  327. 'hostname': self.metadata.hostname,
  328. 'username': self.metadata.username,
  329. 'comment': self.metadata.get('comment', ''),
  330. })
  331. return info
  332. def __str__(self):
  333. return '''\
  334. Archive name: {0.name}
  335. Archive fingerprint: {0.fpr}
  336. Time (start): {start}
  337. Time (end): {end}
  338. Duration: {0.duration}
  339. Number of files: {0.stats.nfiles}
  340. Utilization of max. archive size: {csize_max:.0%}
  341. '''.format(
  342. self,
  343. start=format_time(to_localtime(self.start.replace(tzinfo=timezone.utc))),
  344. end=format_time(to_localtime(self.end.replace(tzinfo=timezone.utc))),
  345. csize_max=self.cache.chunks[self.id].csize / MAX_DATA_SIZE)
  346. def __repr__(self):
  347. return 'Archive(%r)' % self.name
  348. def item_filter(self, item, filter=None):
  349. if not self.consider_part_files and 'part' in item:
  350. # this is a part(ial) file, we usually don't want to consider it.
  351. return False
  352. return filter(item) if filter else True
  353. def iter_items(self, filter=None, preload=False):
  354. for item in self.pipeline.unpack_many(self.metadata.items, preload=preload,
  355. filter=lambda item: self.item_filter(item, filter)):
  356. yield item
  357. def add_item(self, item, show_progress=True):
  358. if show_progress and self.show_progress:
  359. self.stats.show_progress(item=item, dt=0.2)
  360. self.items_buffer.add(item)
  361. def write_checkpoint(self):
  362. self.save(self.checkpoint_name)
  363. del self.manifest.archives[self.checkpoint_name]
  364. self.cache.chunk_decref(self.id, self.stats)
  365. def save(self, name=None, comment=None, timestamp=None, additional_metadata=None):
  366. name = name or self.name
  367. if name in self.manifest.archives:
  368. raise self.AlreadyExists(name)
  369. self.items_buffer.flush(flush=True)
  370. duration = timedelta(seconds=time.monotonic() - self.start_monotonic)
  371. if timestamp is None:
  372. self.end = datetime.utcnow()
  373. self.start = self.end - duration
  374. start = self.start
  375. end = self.end
  376. else:
  377. self.end = timestamp
  378. self.start = timestamp - duration
  379. end = timestamp
  380. start = self.start
  381. metadata = {
  382. 'version': 1,
  383. 'name': name,
  384. 'comment': comment or '',
  385. 'items': self.items_buffer.chunks,
  386. 'cmdline': sys.argv,
  387. 'hostname': socket.gethostname(),
  388. 'username': getuser(),
  389. 'time': start.isoformat(),
  390. 'time_end': end.isoformat(),
  391. 'chunker_params': self.chunker_params,
  392. }
  393. metadata.update(additional_metadata or {})
  394. metadata = ArchiveItem(metadata)
  395. data = self.key.pack_and_authenticate_metadata(metadata.as_dict(), context=b'archive')
  396. self.id = self.key.id_hash(data)
  397. self.cache.add_chunk(self.id, Chunk(data), self.stats)
  398. self.manifest.archives[name] = (self.id, metadata.time)
  399. self.manifest.write()
  400. self.repository.commit()
  401. self.cache.commit()
  402. def calc_stats(self, cache):
  403. def add(id):
  404. count, size, csize = cache.chunks[id]
  405. stats.update(size, csize, count == 1)
  406. cache.chunks[id] = count - 1, size, csize
  407. def add_file_chunks(chunks):
  408. for id, _, _ in chunks:
  409. add(id)
  410. # This function is a bit evil since it abuses the cache to calculate
  411. # the stats. The cache transaction must be rolled back afterwards
  412. unpacker = msgpack.Unpacker(use_list=False)
  413. cache.begin_txn()
  414. stats = Statistics()
  415. add(self.id)
  416. for id, chunk in zip(self.metadata.items, self.repository.get_many(self.metadata.items)):
  417. add(id)
  418. _, data = self.key.decrypt(id, chunk)
  419. unpacker.feed(data)
  420. for item in unpacker:
  421. chunks = item.get(b'chunks')
  422. if chunks is not None:
  423. stats.nfiles += 1
  424. add_file_chunks(chunks)
  425. cache.rollback()
  426. return stats
  427. def extract_item(self, item, restore_attrs=True, dry_run=False, stdout=False, sparse=False,
  428. hardlink_masters=None, stripped_components=0, original_path=None, pi=None):
  429. """
  430. Extract archive item.
  431. :param item: the item to extract
  432. :param restore_attrs: restore file attributes
  433. :param dry_run: do not write any data
  434. :param stdout: write extracted data to stdout
  435. :param sparse: write sparse files (chunk-granularity, independent of the original being sparse)
  436. :param hardlink_masters: maps paths to (chunks, link_target) for extracting subtrees with hardlinks correctly
  437. :param stripped_components: stripped leading path components to correct hard link extraction
  438. :param original_path: 'path' key as stored in archive
  439. :param pi: ProgressIndicatorPercent (or similar) for file extraction progress (in bytes)
  440. """
  441. hardlink_masters = hardlink_masters or {}
  442. has_damaged_chunks = 'chunks_healthy' in item
  443. if dry_run or stdout:
  444. if 'chunks' in item:
  445. item_chunks_size = 0
  446. for _, data in self.pipeline.fetch_many([c.id for c in item.chunks], is_preloaded=True):
  447. if pi:
  448. pi.show(increase=len(data), info=[remove_surrogates(item.path)])
  449. if stdout:
  450. sys.stdout.buffer.write(data)
  451. item_chunks_size += len(data)
  452. if stdout:
  453. sys.stdout.buffer.flush()
  454. if 'size' in item:
  455. item_size = item.size
  456. if item_size != item_chunks_size:
  457. logger.warning('{}: size inconsistency detected: size {}, chunks size {}'.format(
  458. item.path, item_size, item_chunks_size))
  459. if has_damaged_chunks:
  460. logger.warning('File %s has damaged (all-zero) chunks. Try running borg check --repair.' %
  461. remove_surrogates(item.path))
  462. return
  463. original_path = original_path or item.path
  464. dest = self.cwd
  465. if item.path.startswith(('/', '..')):
  466. raise Exception('Path should be relative and local')
  467. path = os.path.join(dest, item.path)
  468. # Attempt to remove existing files, ignore errors on failure
  469. try:
  470. st = os.lstat(path)
  471. if stat.S_ISDIR(st.st_mode):
  472. os.rmdir(path)
  473. else:
  474. os.unlink(path)
  475. except UnicodeEncodeError:
  476. raise self.IncompatibleFilesystemEncodingError(path, sys.getfilesystemencoding()) from None
  477. except OSError:
  478. pass
  479. mode = item.mode
  480. if stat.S_ISREG(mode):
  481. with backup_io('makedirs'):
  482. if not os.path.exists(os.path.dirname(path)):
  483. os.makedirs(os.path.dirname(path))
  484. # Hard link?
  485. if 'source' in item:
  486. source = os.path.join(dest, *item.source.split(os.sep)[stripped_components:])
  487. with backup_io('link'):
  488. if os.path.exists(path):
  489. os.unlink(path)
  490. if item.source not in hardlink_masters:
  491. os.link(source, path)
  492. return
  493. item.chunks, link_target = hardlink_masters[item.source]
  494. if link_target:
  495. # Hard link was extracted previously, just link
  496. with backup_io:
  497. os.link(link_target, path)
  498. return
  499. # Extract chunks, since the item which had the chunks was not extracted
  500. with backup_io('open'):
  501. fd = open(path, 'wb')
  502. with fd:
  503. ids = [c.id for c in item.chunks]
  504. for _, data in self.pipeline.fetch_many(ids, is_preloaded=True):
  505. if pi:
  506. pi.show(increase=len(data), info=[remove_surrogates(item.path)])
  507. with backup_io('write'):
  508. if sparse and self.zeros.startswith(data):
  509. # all-zero chunk: create a hole in a sparse file
  510. fd.seek(len(data), 1)
  511. else:
  512. fd.write(data)
  513. with backup_io('truncate'):
  514. pos = item_chunks_size = fd.tell()
  515. fd.truncate(pos)
  516. fd.flush()
  517. self.restore_attrs(path, item, fd=fd.fileno())
  518. if 'size' in item:
  519. item_size = item.size
  520. if item_size != item_chunks_size:
  521. logger.warning('{}: size inconsistency detected: size {}, chunks size {}'.format(
  522. item.path, item_size, item_chunks_size))
  523. if has_damaged_chunks:
  524. logger.warning('File %s has damaged (all-zero) chunks. Try running borg check --repair.' %
  525. remove_surrogates(item.path))
  526. if hardlink_masters:
  527. # Update master entry with extracted file path, so that following hardlinks don't extract twice.
  528. hardlink_masters[item.get('source') or original_path] = (None, path)
  529. return
  530. with backup_io:
  531. # No repository access beyond this point.
  532. if stat.S_ISDIR(mode):
  533. if not os.path.exists(path):
  534. os.makedirs(path)
  535. if restore_attrs:
  536. self.restore_attrs(path, item)
  537. elif stat.S_ISLNK(mode):
  538. if not os.path.exists(os.path.dirname(path)):
  539. os.makedirs(os.path.dirname(path))
  540. source = item.source
  541. if os.path.exists(path):
  542. os.unlink(path)
  543. try:
  544. os.symlink(source, path)
  545. except UnicodeEncodeError:
  546. raise self.IncompatibleFilesystemEncodingError(source, sys.getfilesystemencoding()) from None
  547. self.restore_attrs(path, item, symlink=True)
  548. elif stat.S_ISFIFO(mode):
  549. if not os.path.exists(os.path.dirname(path)):
  550. os.makedirs(os.path.dirname(path))
  551. os.mkfifo(path)
  552. self.restore_attrs(path, item)
  553. elif stat.S_ISCHR(mode) or stat.S_ISBLK(mode):
  554. os.mknod(path, item.mode, item.rdev)
  555. self.restore_attrs(path, item)
  556. else:
  557. raise Exception('Unknown archive item type %r' % item.mode)
  558. def restore_attrs(self, path, item, symlink=False, fd=None):
  559. """
  560. Restore filesystem attributes on *path* (*fd*) from *item*.
  561. Does not access the repository.
  562. """
  563. backup_io.op = 'attrs'
  564. uid = gid = None
  565. if not self.numeric_owner:
  566. uid = user2uid(item.user)
  567. gid = group2gid(item.group)
  568. uid = item.uid if uid is None else uid
  569. gid = item.gid if gid is None else gid
  570. # This code is a bit of a mess due to os specific differences
  571. try:
  572. if fd:
  573. os.fchown(fd, uid, gid)
  574. else:
  575. os.lchown(path, uid, gid)
  576. except OSError:
  577. pass
  578. if fd:
  579. os.fchmod(fd, item.mode)
  580. elif not symlink:
  581. os.chmod(path, item.mode)
  582. elif has_lchmod: # Not available on Linux
  583. os.lchmod(path, item.mode)
  584. mtime = item.mtime
  585. if 'atime' in item:
  586. atime = item.atime
  587. else:
  588. # old archives only had mtime in item metadata
  589. atime = mtime
  590. try:
  591. if fd:
  592. os.utime(fd, None, ns=(atime, mtime))
  593. else:
  594. os.utime(path, None, ns=(atime, mtime), follow_symlinks=False)
  595. except OSError:
  596. # some systems don't support calling utime on a symlink
  597. pass
  598. acl_set(path, item, self.numeric_owner)
  599. if 'bsdflags' in item:
  600. try:
  601. set_flags(path, item.bsdflags, fd=fd)
  602. except OSError:
  603. pass
  604. # chown removes Linux capabilities, so set the extended attributes at the end, after chown, since they include
  605. # the Linux capabilities in the "security.capability" attribute.
  606. xattrs = item.get('xattrs', {})
  607. for k, v in xattrs.items():
  608. try:
  609. xattr.setxattr(fd or path, k, v, follow_symlinks=False)
  610. except OSError as e:
  611. if e.errno not in (errno.ENOTSUP, errno.EACCES):
  612. # only raise if the errno is not on our ignore list:
  613. # ENOTSUP == xattrs not supported here
  614. # EACCES == permission denied to set this specific xattr
  615. # (this may happen related to security.* keys)
  616. raise
  617. def set_meta(self, key, value):
  618. metadata = self._load_meta(self.id)
  619. setattr(metadata, key, value)
  620. data = msgpack.packb(metadata.as_dict(), unicode_errors='surrogateescape')
  621. new_id = self.key.id_hash(data)
  622. self.cache.add_chunk(new_id, Chunk(data), self.stats)
  623. self.manifest.archives[self.name] = (new_id, metadata.time)
  624. self.cache.chunk_decref(self.id, self.stats)
  625. self.id = new_id
  626. def rename(self, name):
  627. if name in self.manifest.archives:
  628. raise self.AlreadyExists(name)
  629. oldname = self.name
  630. self.name = name
  631. self.set_meta('name', name)
  632. del self.manifest.archives[oldname]
  633. def delete(self, stats, progress=False, forced=False):
  634. class ChunksIndexError(Error):
  635. """Chunk ID {} missing from chunks index, corrupted chunks index - aborting transaction."""
  636. def chunk_decref(id, stats):
  637. nonlocal error
  638. try:
  639. self.cache.chunk_decref(id, stats)
  640. except KeyError:
  641. cid = bin_to_hex(id)
  642. raise ChunksIndexError(cid)
  643. except Repository.ObjectNotFound as e:
  644. # object not in repo - strange, but we wanted to delete it anyway.
  645. if forced == 0:
  646. raise
  647. error = True
  648. error = False
  649. try:
  650. unpacker = msgpack.Unpacker(use_list=False)
  651. items_ids = self.metadata.items
  652. pi = ProgressIndicatorPercent(total=len(items_ids), msg="Decrementing references %3.0f%%", msgid='archive.delete')
  653. for (i, (items_id, data)) in enumerate(zip(items_ids, self.repository.get_many(items_ids))):
  654. if progress:
  655. pi.show(i)
  656. _, data = self.key.decrypt(items_id, data)
  657. unpacker.feed(data)
  658. chunk_decref(items_id, stats)
  659. try:
  660. for item in unpacker:
  661. item = Item(internal_dict=item)
  662. if 'chunks' in item:
  663. for chunk_id, size, csize in item.chunks:
  664. chunk_decref(chunk_id, stats)
  665. except (TypeError, ValueError):
  666. # if items metadata spans multiple chunks and one chunk got dropped somehow,
  667. # it could be that unpacker yields bad types
  668. if forced == 0:
  669. raise
  670. error = True
  671. if progress:
  672. pi.finish()
  673. except (msgpack.UnpackException, Repository.ObjectNotFound):
  674. # items metadata corrupted
  675. if forced == 0:
  676. raise
  677. error = True
  678. # in forced delete mode, we try hard to delete at least the manifest entry,
  679. # if possible also the archive superblock, even if processing the items raises
  680. # some harmless exception.
  681. chunk_decref(self.id, stats)
  682. del self.manifest.archives[self.name]
  683. if error:
  684. logger.warning('forced deletion succeeded, but the deleted archive was corrupted.')
  685. logger.warning('borg check --repair is required to free all space.')
  686. def stat_simple_attrs(self, st):
  687. attrs = dict(
  688. mode=st.st_mode,
  689. uid=st.st_uid,
  690. gid=st.st_gid,
  691. mtime=st.st_mtime_ns,
  692. )
  693. # borg can work with archives only having mtime (older attic archives do not have
  694. # atime/ctime). it can be useful to omit atime/ctime, if they change without the
  695. # file content changing - e.g. to get better metadata deduplication.
  696. if not self.noatime:
  697. attrs['atime'] = st.st_atime_ns
  698. if not self.noctime:
  699. attrs['ctime'] = st.st_ctime_ns
  700. if self.numeric_owner:
  701. attrs['user'] = attrs['group'] = None
  702. else:
  703. attrs['user'] = uid2user(st.st_uid)
  704. attrs['group'] = gid2group(st.st_gid)
  705. return attrs
  706. def stat_ext_attrs(self, st, path):
  707. attrs = {}
  708. with backup_io('extended stat'):
  709. xattrs = xattr.get_all(path, follow_symlinks=False)
  710. bsdflags = get_flags(path, st)
  711. acl_get(path, attrs, st, self.numeric_owner)
  712. if xattrs:
  713. attrs['xattrs'] = StableDict(xattrs)
  714. if bsdflags:
  715. attrs['bsdflags'] = bsdflags
  716. return attrs
  717. def stat_attrs(self, st, path):
  718. attrs = self.stat_simple_attrs(st)
  719. attrs.update(self.stat_ext_attrs(st, path))
  720. return attrs
  721. def process_dir(self, path, st):
  722. item = Item(path=make_path_safe(path))
  723. item.update(self.stat_attrs(st, path))
  724. self.add_item(item)
  725. return 'd' # directory
  726. def process_fifo(self, path, st):
  727. item = Item(path=make_path_safe(path))
  728. item.update(self.stat_attrs(st, path))
  729. self.add_item(item)
  730. return 'f' # fifo
  731. def process_dev(self, path, st):
  732. item = Item(path=make_path_safe(path), rdev=st.st_rdev)
  733. item.update(self.stat_attrs(st, path))
  734. self.add_item(item)
  735. if stat.S_ISCHR(st.st_mode):
  736. return 'c' # char device
  737. elif stat.S_ISBLK(st.st_mode):
  738. return 'b' # block device
  739. def process_symlink(self, path, st):
  740. with backup_io('readlink'):
  741. source = os.readlink(path)
  742. item = Item(path=make_path_safe(path), source=source)
  743. item.update(self.stat_attrs(st, path))
  744. self.add_item(item)
  745. return 's' # symlink
  746. def write_part_file(self, item, from_chunk, number):
  747. item = Item(internal_dict=item.as_dict())
  748. length = len(item.chunks)
  749. # the item should only have the *additional* chunks we processed after the last partial item:
  750. item.chunks = item.chunks[from_chunk:]
  751. item.get_size(memorize=True)
  752. item.path += '.borg_part_%d' % number
  753. item.part = number
  754. number += 1
  755. self.add_item(item, show_progress=False)
  756. self.write_checkpoint()
  757. return length, number
  758. def chunk_file(self, item, cache, stats, chunk_iter, chunk_processor=None, **chunk_kw):
  759. if not chunk_processor:
  760. def chunk_processor(data):
  761. return cache.add_chunk(self.key.id_hash(data), Chunk(data, **chunk_kw), stats)
  762. item.chunks = []
  763. from_chunk = 0
  764. part_number = 1
  765. for data in chunk_iter:
  766. item.chunks.append(chunk_processor(data))
  767. if self.show_progress:
  768. self.stats.show_progress(item=item, dt=0.2)
  769. if self.checkpoint_interval and time.monotonic() - self.last_checkpoint > self.checkpoint_interval:
  770. from_chunk, part_number = self.write_part_file(item, from_chunk, part_number)
  771. self.last_checkpoint = time.monotonic()
  772. else:
  773. if part_number > 1:
  774. if item.chunks[from_chunk:]:
  775. # if we already have created a part item inside this file, we want to put the final
  776. # chunks (if any) into a part item also (so all parts can be concatenated to get
  777. # the complete file):
  778. from_chunk, part_number = self.write_part_file(item, from_chunk, part_number)
  779. self.last_checkpoint = time.monotonic()
  780. # if we created part files, we have referenced all chunks from the part files,
  781. # but we also will reference the same chunks also from the final, complete file:
  782. for chunk in item.chunks:
  783. cache.chunk_incref(chunk.id, stats)
  784. def process_stdin(self, path, cache):
  785. uid, gid = 0, 0
  786. t = int(time.time()) * 1000000000
  787. item = Item(
  788. path=path,
  789. mode=0o100660, # regular file, ug=rw
  790. uid=uid, user=uid2user(uid),
  791. gid=gid, group=gid2group(gid),
  792. mtime=t, atime=t, ctime=t,
  793. )
  794. fd = sys.stdin.buffer # binary
  795. self.chunk_file(item, cache, self.stats, backup_io_iter(self.chunker.chunkify(fd)))
  796. item.get_size(memorize=True)
  797. self.stats.nfiles += 1
  798. self.add_item(item)
  799. return 'i' # stdin
  800. def process_file(self, path, st, cache, ignore_inode=False):
  801. status = None
  802. safe_path = make_path_safe(path)
  803. # Is it a hard link?
  804. if st.st_nlink > 1:
  805. source = self.hard_links.get((st.st_ino, st.st_dev))
  806. if source is not None:
  807. item = Item(path=safe_path, source=source)
  808. item.update(self.stat_attrs(st, path))
  809. self.add_item(item)
  810. status = 'h' # regular file, hardlink (to already seen inodes)
  811. return status
  812. is_special_file = is_special(st.st_mode)
  813. if not is_special_file:
  814. path_hash = self.key.id_hash(safe_encode(os.path.join(self.cwd, path)))
  815. ids = cache.file_known_and_unchanged(path_hash, st, ignore_inode)
  816. else:
  817. # in --read-special mode, we may be called for special files.
  818. # there should be no information in the cache about special files processed in
  819. # read-special mode, but we better play safe as this was wrong in the past:
  820. path_hash = ids = None
  821. first_run = not cache.files and cache.do_files
  822. if first_run:
  823. logger.debug('Processing files ...')
  824. chunks = None
  825. if ids is not None:
  826. # Make sure all ids are available
  827. for id_ in ids:
  828. if not cache.seen_chunk(id_):
  829. break
  830. else:
  831. chunks = [cache.chunk_incref(id_, self.stats) for id_ in ids]
  832. status = 'U' # regular file, unchanged
  833. else:
  834. status = 'A' # regular file, added
  835. item = Item(
  836. path=safe_path,
  837. hardlink_master=st.st_nlink > 1, # item is a hard link and has the chunks
  838. )
  839. item.update(self.stat_simple_attrs(st))
  840. # Only chunkify the file if needed
  841. if chunks is not None:
  842. item.chunks = chunks
  843. else:
  844. compress = self.compression_decider1.decide(path)
  845. self.file_compression_logger.debug('%s -> compression %s', path, compress['name'])
  846. with backup_io('open'):
  847. fh = Archive._open_rb(path)
  848. with os.fdopen(fh, 'rb') as fd:
  849. self.chunk_file(item, cache, self.stats, backup_io_iter(self.chunker.chunkify(fd, fh)), compress=compress)
  850. if not is_special_file:
  851. # we must not memorize special files, because the contents of e.g. a
  852. # block or char device will change without its mtime/size/inode changing.
  853. cache.memorize_file(path_hash, st, [c.id for c in item.chunks])
  854. status = status or 'M' # regular file, modified (if not 'A' already)
  855. item.update(self.stat_attrs(st, path))
  856. item.get_size(memorize=True)
  857. if is_special_file:
  858. # we processed a special file like a regular file. reflect that in mode,
  859. # so it can be extracted / accessed in FUSE mount like a regular file:
  860. item.mode = stat.S_IFREG | stat.S_IMODE(item.mode)
  861. self.stats.nfiles += 1
  862. self.add_item(item)
  863. if st.st_nlink > 1 and source is None:
  864. # Add the hard link reference *after* the file has been added to the archive.
  865. self.hard_links[st.st_ino, st.st_dev] = safe_path
  866. return status
  867. @staticmethod
  868. def list_archives(repository, key, manifest, cache=None):
  869. # expensive! see also Manifest.list_archive_infos.
  870. for name in manifest.archives:
  871. yield Archive(repository, key, manifest, name, cache=cache)
  872. @staticmethod
  873. def _open_rb(path):
  874. try:
  875. # if we have O_NOATIME, this likely will succeed if we are root or owner of file:
  876. return os.open(path, flags_noatime)
  877. except PermissionError:
  878. if flags_noatime == flags_normal:
  879. # we do not have O_NOATIME, no need to try again:
  880. raise
  881. # Was this EPERM due to the O_NOATIME flag? Try again without it:
  882. return os.open(path, flags_normal)
  883. def valid_msgpacked_dict(d, keys_serialized):
  884. """check if the data <d> looks like a msgpacked dict"""
  885. d_len = len(d)
  886. if d_len == 0:
  887. return False
  888. if d[0] & 0xf0 == 0x80: # object is a fixmap (up to 15 elements)
  889. offs = 1
  890. elif d[0] == 0xde: # object is a map16 (up to 2^16-1 elements)
  891. offs = 3
  892. else:
  893. # object is not a map (dict)
  894. # note: we must not have dicts with > 2^16-1 elements
  895. return False
  896. if d_len <= offs:
  897. return False
  898. # is the first dict key a bytestring?
  899. if d[offs] & 0xe0 == 0xa0: # key is a small bytestring (up to 31 chars)
  900. pass
  901. elif d[offs] in (0xd9, 0xda, 0xdb): # key is a str8, str16 or str32
  902. pass
  903. else:
  904. # key is not a bytestring
  905. return False
  906. # is the bytestring any of the expected key names?
  907. key_serialized = d[offs:]
  908. return any(key_serialized.startswith(pattern) for pattern in keys_serialized)
  909. class RobustUnpacker:
  910. """A restartable/robust version of the streaming msgpack unpacker
  911. """
  912. class UnpackerCrashed(Exception):
  913. """raise if unpacker crashed"""
  914. def __init__(self, validator, item_keys):
  915. super().__init__()
  916. self.item_keys = [msgpack.packb(name.encode()) for name in item_keys]
  917. self.validator = validator
  918. self._buffered_data = []
  919. self._resync = False
  920. self._unpacker = msgpack.Unpacker(object_hook=StableDict)
  921. def resync(self):
  922. self._buffered_data = []
  923. self._resync = True
  924. def feed(self, data):
  925. if self._resync:
  926. self._buffered_data.append(data)
  927. else:
  928. self._unpacker.feed(data)
  929. def __iter__(self):
  930. return self
  931. def __next__(self):
  932. def unpack_next():
  933. try:
  934. return next(self._unpacker)
  935. except (TypeError, ValueError) as err:
  936. # transform exceptions that might be raised when feeding
  937. # msgpack with invalid data to a more specific exception
  938. raise self.UnpackerCrashed(str(err))
  939. if self._resync:
  940. data = b''.join(self._buffered_data)
  941. while self._resync:
  942. if not data:
  943. raise StopIteration
  944. # Abort early if the data does not look like a serialized item dict
  945. if not valid_msgpacked_dict(data, self.item_keys):
  946. data = data[1:]
  947. continue
  948. self._unpacker = msgpack.Unpacker(object_hook=StableDict)
  949. self._unpacker.feed(data)
  950. try:
  951. item = unpack_next()
  952. except (self.UnpackerCrashed, StopIteration):
  953. # as long as we are resyncing, we also ignore StopIteration
  954. pass
  955. else:
  956. if self.validator(item):
  957. self._resync = False
  958. return item
  959. data = data[1:]
  960. else:
  961. return unpack_next()
  962. class ArchiveChecker:
  963. def __init__(self):
  964. self.error_found = False
  965. self.possibly_superseded = set()
  966. def check(self, repository, repair=False, archive=None, first=0, last=0, sort_by='', prefix='',
  967. verify_data=False, save_space=False):
  968. """Perform a set of checks on 'repository'
  969. :param repair: enable repair mode, write updated or corrected data into repository
  970. :param archive: only check this archive
  971. :param first/last/sort_by: only check this number of first/last archives ordered by sort_by
  972. :param prefix: only check archives with this prefix
  973. :param verify_data: integrity verification of data referenced by archives
  974. :param save_space: Repository.commit(save_space)
  975. """
  976. logger.info('Starting archive consistency check...')
  977. self.check_all = archive is None and not any((first, last, prefix))
  978. self.repair = repair
  979. self.repository = repository
  980. self.init_chunks()
  981. if not self.chunks:
  982. logger.error('Repository contains no apparent data at all, cannot continue check/repair.')
  983. return False
  984. self.key = self.identify_key(repository)
  985. if verify_data:
  986. self.verify_data()
  987. if Manifest.MANIFEST_ID not in self.chunks:
  988. logger.error("Repository manifest not found!")
  989. self.error_found = True
  990. self.manifest = self.rebuild_manifest()
  991. else:
  992. try:
  993. self.manifest, _ = Manifest.load(repository, key=self.key)
  994. except IntegrityError as exc:
  995. logger.error('Repository manifest is corrupted: %s', exc)
  996. self.error_found = True
  997. del self.chunks[Manifest.MANIFEST_ID]
  998. self.manifest = self.rebuild_manifest()
  999. self.rebuild_refcounts(archive=archive, first=first, last=last, sort_by=sort_by, prefix=prefix)
  1000. self.orphan_chunks_check()
  1001. self.finish(save_space=save_space)
  1002. if self.error_found:
  1003. logger.error('Archive consistency check complete, problems found.')
  1004. else:
  1005. logger.info('Archive consistency check complete, no problems found.')
  1006. return self.repair or not self.error_found
  1007. def init_chunks(self):
  1008. """Fetch a list of all object keys from repository
  1009. """
  1010. # Explicitly set the initial hash table capacity to avoid performance issues
  1011. # due to hash table "resonance".
  1012. # Since reconstruction of archive items can add some new chunks, add 10 % headroom
  1013. capacity = int(len(self.repository) / ChunkIndex.MAX_LOAD_FACTOR * 1.1)
  1014. self.chunks = ChunkIndex(capacity)
  1015. marker = None
  1016. while True:
  1017. result = self.repository.list(limit=LIST_SCAN_LIMIT, marker=marker)
  1018. if not result:
  1019. break
  1020. marker = result[-1]
  1021. init_entry = ChunkIndexEntry(refcount=0, size=0, csize=0)
  1022. for id_ in result:
  1023. self.chunks[id_] = init_entry
  1024. def identify_key(self, repository):
  1025. try:
  1026. some_chunkid, _ = next(self.chunks.iteritems())
  1027. except StopIteration:
  1028. # repo is completely empty, no chunks
  1029. return None
  1030. cdata = repository.get(some_chunkid)
  1031. return key_factory(repository, cdata)
  1032. def verify_data(self):
  1033. logger.info('Starting cryptographic data integrity verification...')
  1034. chunks_count_index = len(self.chunks)
  1035. chunks_count_segments = 0
  1036. errors = 0
  1037. defect_chunks = []
  1038. pi = ProgressIndicatorPercent(total=chunks_count_index, msg="Verifying data %6.2f%%", step=0.01,
  1039. msgid='check.verify_data')
  1040. marker = None
  1041. while True:
  1042. chunk_ids = self.repository.scan(limit=100, marker=marker)
  1043. if not chunk_ids:
  1044. break
  1045. chunks_count_segments += len(chunk_ids)
  1046. marker = chunk_ids[-1]
  1047. chunk_data_iter = self.repository.get_many(chunk_ids)
  1048. chunk_ids_revd = list(reversed(chunk_ids))
  1049. while chunk_ids_revd:
  1050. pi.show()
  1051. chunk_id = chunk_ids_revd.pop(-1) # better efficiency
  1052. try:
  1053. encrypted_data = next(chunk_data_iter)
  1054. except (Repository.ObjectNotFound, IntegrityError) as err:
  1055. self.error_found = True
  1056. errors += 1
  1057. logger.error('chunk %s: %s', bin_to_hex(chunk_id), err)
  1058. if isinstance(err, IntegrityError):
  1059. defect_chunks.append(chunk_id)
  1060. # as the exception killed our generator, make a new one for remaining chunks:
  1061. if chunk_ids_revd:
  1062. chunk_ids = list(reversed(chunk_ids_revd))
  1063. chunk_data_iter = self.repository.get_many(chunk_ids)
  1064. else:
  1065. try:
  1066. _chunk_id = None if chunk_id == Manifest.MANIFEST_ID else chunk_id
  1067. _, data = self.key.decrypt(_chunk_id, encrypted_data)
  1068. except IntegrityError as integrity_error:
  1069. self.error_found = True
  1070. errors += 1
  1071. logger.error('chunk %s, integrity error: %s', bin_to_hex(chunk_id), integrity_error)
  1072. defect_chunks.append(chunk_id)
  1073. pi.finish()
  1074. if chunks_count_index != chunks_count_segments:
  1075. logger.error('Repo/Chunks index object count vs. segment files object count mismatch.')
  1076. logger.error('Repo/Chunks index: %d objects != segment files: %d objects',
  1077. chunks_count_index, chunks_count_segments)
  1078. if defect_chunks:
  1079. if self.repair:
  1080. # if we kill the defect chunk here, subsequent actions within this "borg check"
  1081. # run will find missing chunks and replace them with all-zero replacement
  1082. # chunks and flag the files as "repaired".
  1083. # if another backup is done later and the missing chunks get backupped again,
  1084. # a "borg check" afterwards can heal all files where this chunk was missing.
  1085. logger.warning('Found defect chunks. They will be deleted now, so affected files can '
  1086. 'get repaired now and maybe healed later.')
  1087. for defect_chunk in defect_chunks:
  1088. # remote repo (ssh): retry might help for strange network / NIC / RAM errors
  1089. # as the chunk will be retransmitted from remote server.
  1090. # local repo (fs): as chunks.iteritems loop usually pumps a lot of data through,
  1091. # a defect chunk is likely not in the fs cache any more and really gets re-read
  1092. # from the underlying media.
  1093. encrypted_data = self.repository.get(defect_chunk)
  1094. try:
  1095. _chunk_id = None if defect_chunk == Manifest.MANIFEST_ID else defect_chunk
  1096. self.key.decrypt(_chunk_id, encrypted_data)
  1097. except IntegrityError:
  1098. # failed twice -> get rid of this chunk
  1099. del self.chunks[defect_chunk]
  1100. self.repository.delete(defect_chunk)
  1101. logger.debug('chunk %s deleted.', bin_to_hex(defect_chunk))
  1102. else:
  1103. logger.warning('chunk %s not deleted, did not consistently fail.')
  1104. else:
  1105. logger.warning('Found defect chunks. With --repair, they would get deleted, so affected '
  1106. 'files could get repaired then and maybe healed later.')
  1107. for defect_chunk in defect_chunks:
  1108. logger.debug('chunk %s is defect.', bin_to_hex(defect_chunk))
  1109. log = logger.error if errors else logger.info
  1110. log('Finished cryptographic data integrity verification, verified %d chunks with %d integrity errors.',
  1111. chunks_count_segments, errors)
  1112. def rebuild_manifest(self):
  1113. """Rebuild the manifest object if it is missing
  1114. Iterates through all objects in the repository looking for archive metadata blocks.
  1115. """
  1116. required_archive_keys = frozenset(key.encode() for key in REQUIRED_ARCHIVE_KEYS)
  1117. def valid_archive(obj):
  1118. if not isinstance(obj, dict):
  1119. return False
  1120. keys = set(obj)
  1121. return required_archive_keys.issubset(keys)
  1122. logger.info('Rebuilding missing manifest, this might take some time...')
  1123. # as we have lost the manifest, we do not know any more what valid item keys we had.
  1124. # collecting any key we encounter in a damaged repo seems unwise, thus we just use
  1125. # the hardcoded list from the source code. thus, it is not recommended to rebuild a
  1126. # lost manifest on a older borg version than the most recent one that was ever used
  1127. # within this repository (assuming that newer borg versions support more item keys).
  1128. manifest = Manifest(self.key, self.repository)
  1129. archive_keys_serialized = [msgpack.packb(name.encode()) for name in ARCHIVE_KEYS]
  1130. for chunk_id, _ in self.chunks.iteritems():
  1131. cdata = self.repository.get(chunk_id)
  1132. try:
  1133. _, data = self.key.decrypt(chunk_id, cdata)
  1134. except IntegrityError as exc:
  1135. logger.error('Skipping corrupted chunk: %s', exc)
  1136. self.error_found = True
  1137. continue
  1138. if not valid_msgpacked_dict(data, archive_keys_serialized):
  1139. continue
  1140. if b'cmdline' not in data or b'\xa7version\x01' not in data:
  1141. continue
  1142. try:
  1143. archive = msgpack.unpackb(data)
  1144. # Ignore exceptions that might be raised when feeding
  1145. # msgpack with invalid data
  1146. except (TypeError, ValueError, StopIteration):
  1147. continue
  1148. if valid_archive(archive):
  1149. archive = ArchiveItem(internal_dict=archive)
  1150. name = archive.name
  1151. logger.info('Found archive %s', name)
  1152. if name in manifest.archives:
  1153. i = 1
  1154. while True:
  1155. new_name = '%s.%d' % (name, i)
  1156. if new_name not in manifest.archives:
  1157. break
  1158. i += 1
  1159. logger.warning('Duplicate archive name %s, storing as %s', name, new_name)
  1160. name = new_name
  1161. manifest.archives[name] = (chunk_id, archive.time)
  1162. logger.info('Manifest rebuild complete.')
  1163. return manifest
  1164. def rebuild_refcounts(self, archive=None, first=0, last=0, sort_by='', prefix=''):
  1165. """Rebuild object reference counts by walking the metadata
  1166. Missing and/or incorrect data is repaired when detected
  1167. """
  1168. # Exclude the manifest from chunks
  1169. del self.chunks[Manifest.MANIFEST_ID]
  1170. def mark_as_possibly_superseded(id_):
  1171. if self.chunks.get(id_, ChunkIndexEntry(0, 0, 0)).refcount == 0:
  1172. self.possibly_superseded.add(id_)
  1173. def add_callback(chunk):
  1174. id_ = self.key.id_hash(chunk.data)
  1175. cdata = self.key.encrypt(chunk)
  1176. add_reference(id_, len(chunk.data), len(cdata), cdata)
  1177. return id_
  1178. def add_reference(id_, size, csize, cdata=None):
  1179. try:
  1180. self.chunks.incref(id_)
  1181. except KeyError:
  1182. assert cdata is not None
  1183. self.chunks[id_] = ChunkIndexEntry(refcount=1, size=size, csize=csize)
  1184. if self.repair:
  1185. self.repository.put(id_, cdata)
  1186. def verify_file_chunks(item):
  1187. """Verifies that all file chunks are present.
  1188. Missing file chunks will be replaced with new chunks of the same length containing all zeros.
  1189. If a previously missing file chunk re-appears, the replacement chunk is replaced by the correct one.
  1190. """
  1191. def replacement_chunk(size):
  1192. data = bytes(size)
  1193. chunk_id = self.key.id_hash(data)
  1194. cdata = self.key.encrypt(Chunk(data))
  1195. csize = len(cdata)
  1196. return chunk_id, size, csize, cdata
  1197. offset = 0
  1198. chunk_list = []
  1199. chunks_replaced = False
  1200. has_chunks_healthy = 'chunks_healthy' in item
  1201. chunks_current = item.chunks
  1202. chunks_healthy = item.chunks_healthy if has_chunks_healthy else chunks_current
  1203. assert len(chunks_current) == len(chunks_healthy)
  1204. for chunk_current, chunk_healthy in zip(chunks_current, chunks_healthy):
  1205. chunk_id, size, csize = chunk_healthy
  1206. if chunk_id not in self.chunks:
  1207. # a chunk of the healthy list is missing
  1208. if chunk_current == chunk_healthy:
  1209. logger.error('{}: New missing file chunk detected (Byte {}-{}). '
  1210. 'Replacing with all-zero chunk.'.format(item.path, offset, offset + size))
  1211. self.error_found = chunks_replaced = True
  1212. chunk_id, size, csize, cdata = replacement_chunk(size)
  1213. add_reference(chunk_id, size, csize, cdata)
  1214. else:
  1215. logger.info('{}: Previously missing file chunk is still missing (Byte {}-{}). It has a '
  1216. 'all-zero replacement chunk already.'.format(item.path, offset, offset + size))
  1217. chunk_id, size, csize = chunk_current
  1218. if chunk_id in self.chunks:
  1219. add_reference(chunk_id, size, csize)
  1220. else:
  1221. logger.warning('{}: Missing all-zero replacement chunk detected (Byte {}-{}). '
  1222. 'Generating new replacement chunk.'.format(item.path, offset, offset + size))
  1223. self.error_found = chunks_replaced = True
  1224. chunk_id, size, csize, cdata = replacement_chunk(size)
  1225. add_reference(chunk_id, size, csize, cdata)
  1226. else:
  1227. if chunk_current == chunk_healthy:
  1228. # normal case, all fine.
  1229. add_reference(chunk_id, size, csize)
  1230. else:
  1231. logger.info('{}: Healed previously missing file chunk! '
  1232. '(Byte {}-{}).'.format(item.path, offset, offset + size))
  1233. add_reference(chunk_id, size, csize)
  1234. mark_as_possibly_superseded(chunk_current[0]) # maybe orphaned the all-zero replacement chunk
  1235. chunk_list.append([chunk_id, size, csize]) # list-typed element as chunks_healthy is list-of-lists
  1236. offset += size
  1237. if chunks_replaced and not has_chunks_healthy:
  1238. # if this is first repair, remember the correct chunk IDs, so we can maybe heal the file later
  1239. item.chunks_healthy = item.chunks
  1240. if has_chunks_healthy and chunk_list == chunks_healthy:
  1241. logger.info('{}: Completely healed previously damaged file!'.format(item.path))
  1242. del item.chunks_healthy
  1243. item.chunks = chunk_list
  1244. if 'size' in item:
  1245. item_size = item.size
  1246. item_chunks_size = item.get_size(compressed=False, from_chunks=True)
  1247. if item_size != item_chunks_size:
  1248. # just warn, but keep the inconsistency, so that borg extract can warn about it.
  1249. logger.warning('{}: size inconsistency detected: size {}, chunks size {}'.format(
  1250. item.path, item_size, item_chunks_size))
  1251. def robust_iterator(archive):
  1252. """Iterates through all archive items
  1253. Missing item chunks will be skipped and the msgpack stream will be restarted
  1254. """
  1255. item_keys = frozenset(key.encode() for key in self.manifest.item_keys)
  1256. required_item_keys = frozenset(key.encode() for key in REQUIRED_ITEM_KEYS)
  1257. unpacker = RobustUnpacker(lambda item: isinstance(item, dict) and 'path' in item,
  1258. self.manifest.item_keys)
  1259. _state = 0
  1260. def missing_chunk_detector(chunk_id):
  1261. nonlocal _state
  1262. if _state % 2 != int(chunk_id not in self.chunks):
  1263. _state += 1
  1264. return _state
  1265. def report(msg, chunk_id, chunk_no):
  1266. cid = bin_to_hex(chunk_id)
  1267. msg += ' [chunk: %06d_%s]' % (chunk_no, cid) # see "debug dump-archive-items"
  1268. self.error_found = True
  1269. logger.error(msg)
  1270. def list_keys_safe(keys):
  1271. return ', '.join((k.decode() if isinstance(k, bytes) else str(k) for k in keys))
  1272. def valid_item(obj):
  1273. if not isinstance(obj, StableDict):
  1274. return False, 'not a dictionary'
  1275. # A bug in Attic up to and including release 0.13 added a (meaningless) b'acl' key to every item.
  1276. # We ignore it here, should it exist. See test_attic013_acl_bug for details.
  1277. obj.pop(b'acl', None)
  1278. keys = set(obj)
  1279. if not required_item_keys.issubset(keys):
  1280. return False, 'missing required keys: ' + list_keys_safe(required_item_keys - keys)
  1281. if not keys.issubset(item_keys):
  1282. return False, 'invalid keys: ' + list_keys_safe(keys - item_keys)
  1283. return True, ''
  1284. i = 0
  1285. for state, items in groupby(archive.items, missing_chunk_detector):
  1286. items = list(items)
  1287. if state % 2:
  1288. for chunk_id in items:
  1289. report('item metadata chunk missing', chunk_id, i)
  1290. i += 1
  1291. continue
  1292. if state > 0:
  1293. unpacker.resync()
  1294. for chunk_id, cdata in zip(items, repository.get_many(items)):
  1295. _, data = self.key.decrypt(chunk_id, cdata)
  1296. unpacker.feed(data)
  1297. try:
  1298. for item in unpacker:
  1299. valid, reason = valid_item(item)
  1300. if valid:
  1301. yield Item(internal_dict=item)
  1302. else:
  1303. report('Did not get expected metadata dict when unpacking item metadata (%s)' % reason, chunk_id, i)
  1304. except RobustUnpacker.UnpackerCrashed as err:
  1305. report('Unpacker crashed while unpacking item metadata, trying to resync...', chunk_id, i)
  1306. unpacker.resync()
  1307. except Exception:
  1308. report('Exception while unpacking item metadata', chunk_id, i)
  1309. raise
  1310. i += 1
  1311. if archive is None:
  1312. sort_by = sort_by.split(',')
  1313. if any((first, last, prefix)):
  1314. archive_infos = self.manifest.archives.list(sort_by=sort_by, prefix=prefix, first=first, last=last)
  1315. if prefix and not archive_infos:
  1316. logger.warning('--prefix %s does not match any archives', prefix)
  1317. if first and len(archive_infos) < first:
  1318. logger.warning('--first %d archives: only found %d archives', first, len(archive_infos))
  1319. if last and len(archive_infos) < last:
  1320. logger.warning('--last %d archives: only found %d archives', last, len(archive_infos))
  1321. else:
  1322. archive_infos = self.manifest.archives.list(sort_by=sort_by)
  1323. else:
  1324. # we only want one specific archive
  1325. try:
  1326. archive_infos = [self.manifest.archives[archive]]
  1327. except KeyError:
  1328. logger.error("Archive '%s' not found.", archive)
  1329. self.error_found = True
  1330. return
  1331. num_archives = len(archive_infos)
  1332. with cache_if_remote(self.repository) as repository:
  1333. for i, info in enumerate(archive_infos):
  1334. logger.info('Analyzing archive {} ({}/{})'.format(info.name, i + 1, num_archives))
  1335. archive_id = info.id
  1336. if archive_id not in self.chunks:
  1337. logger.error('Archive metadata block is missing!')
  1338. self.error_found = True
  1339. del self.manifest.archives[info.name]
  1340. continue
  1341. mark_as_possibly_superseded(archive_id)
  1342. cdata = self.repository.get(archive_id)
  1343. _, data = self.key.decrypt(archive_id, cdata)
  1344. archive = ArchiveItem(internal_dict=msgpack.unpackb(data))
  1345. if archive.version != 1:
  1346. raise Exception('Unknown archive metadata version')
  1347. archive.cmdline = [safe_decode(arg) for arg in archive.cmdline]
  1348. items_buffer = ChunkBuffer(self.key)
  1349. items_buffer.write_chunk = add_callback
  1350. for item in robust_iterator(archive):
  1351. if 'chunks' in item:
  1352. verify_file_chunks(item)
  1353. items_buffer.add(item)
  1354. items_buffer.flush(flush=True)
  1355. for previous_item_id in archive.items:
  1356. mark_as_possibly_superseded(previous_item_id)
  1357. archive.items = items_buffer.chunks
  1358. data = msgpack.packb(archive.as_dict(), unicode_errors='surrogateescape')
  1359. new_archive_id = self.key.id_hash(data)
  1360. cdata = self.key.encrypt(Chunk(data))
  1361. add_reference(new_archive_id, len(data), len(cdata), cdata)
  1362. self.manifest.archives[info.name] = (new_archive_id, info.ts)
  1363. def orphan_chunks_check(self):
  1364. if self.check_all:
  1365. unused = {id_ for id_, entry in self.chunks.iteritems() if entry.refcount == 0}
  1366. orphaned = unused - self.possibly_superseded
  1367. if orphaned:
  1368. logger.error('{} orphaned objects found!'.format(len(orphaned)))
  1369. self.error_found = True
  1370. if self.repair:
  1371. for id_ in unused:
  1372. self.repository.delete(id_)
  1373. else:
  1374. logger.info('Orphaned objects check skipped (needs all archives checked).')
  1375. def finish(self, save_space=False):
  1376. if self.repair:
  1377. self.manifest.write()
  1378. self.repository.commit(save_space=save_space)
  1379. class ArchiveRecreater:
  1380. class Interrupted(Exception):
  1381. def __init__(self, metadata=None):
  1382. self.metadata = metadata or {}
  1383. @staticmethod
  1384. def is_temporary_archive(archive_name):
  1385. return archive_name.endswith('.recreate')
  1386. def __init__(self, repository, manifest, key, cache, matcher,
  1387. exclude_caches=False, exclude_if_present=None, keep_exclude_tags=False,
  1388. chunker_params=None, compression=None, compression_files=None, always_recompress=False,
  1389. dry_run=False, stats=False, progress=False, file_status_printer=None,
  1390. checkpoint_interval=1800):
  1391. self.repository = repository
  1392. self.key = key
  1393. self.manifest = manifest
  1394. self.cache = cache
  1395. self.matcher = matcher
  1396. self.exclude_caches = exclude_caches
  1397. self.exclude_if_present = exclude_if_present or []
  1398. self.keep_exclude_tags = keep_exclude_tags
  1399. self.rechunkify = chunker_params is not None
  1400. if self.rechunkify:
  1401. logger.debug('Rechunking archives to %s', chunker_params)
  1402. self.chunker_params = chunker_params or CHUNKER_PARAMS
  1403. self.recompress = bool(compression)
  1404. self.always_recompress = always_recompress
  1405. self.compression = compression or CompressionSpec('none')
  1406. self.seen_chunks = set()
  1407. self.compression_decider1 = CompressionDecider1(compression or CompressionSpec('none'),
  1408. compression_files or [])
  1409. key.compression_decider2 = CompressionDecider2(compression or CompressionSpec('none'))
  1410. self.dry_run = dry_run
  1411. self.stats = stats
  1412. self.progress = progress
  1413. self.print_file_status = file_status_printer or (lambda *args: None)
  1414. self.checkpoint_interval = None if dry_run else checkpoint_interval
  1415. def recreate(self, archive_name, comment=None, target_name=None):
  1416. assert not self.is_temporary_archive(archive_name)
  1417. archive = self.open_archive(archive_name)
  1418. target = self.create_target(archive, target_name)
  1419. if self.exclude_if_present or self.exclude_caches:
  1420. self.matcher_add_tagged_dirs(archive)
  1421. if self.matcher.empty() and not self.recompress and not target.recreate_rechunkify and comment is None:
  1422. logger.info("Skipping archive %s, nothing to do", archive_name)
  1423. return
  1424. self.process_items(archive, target)
  1425. replace_original = target_name is None
  1426. self.save(archive, target, comment, replace_original=replace_original)
  1427. def process_items(self, archive, target):
  1428. matcher = self.matcher
  1429. target_is_subset = not matcher.empty()
  1430. hardlink_masters = {} if target_is_subset else None
  1431. def item_is_hardlink_master(item):
  1432. return (target_is_subset and
  1433. stat.S_ISREG(item.mode) and
  1434. item.get('hardlink_master', True) and
  1435. 'source' not in item and
  1436. not matcher.match(item.path))
  1437. for item in archive.iter_items():
  1438. if item_is_hardlink_master(item):
  1439. hardlink_masters[item.path] = (item.get('chunks'), None)
  1440. continue
  1441. if not matcher.match(item.path):
  1442. self.print_file_status('x', item.path)
  1443. continue
  1444. if target_is_subset and stat.S_ISREG(item.mode) and item.get('source') in hardlink_masters:
  1445. # master of this hard link is outside the target subset
  1446. chunks, new_source = hardlink_masters[item.source]
  1447. if new_source is None:
  1448. # First item to use this master, move the chunks
  1449. item.chunks = chunks
  1450. hardlink_masters[item.source] = (None, item.path)
  1451. del item.source
  1452. else:
  1453. # Master was already moved, only update this item's source
  1454. item.source = new_source
  1455. if self.dry_run:
  1456. self.print_file_status('-', item.path)
  1457. else:
  1458. self.process_item(archive, target, item)
  1459. if self.progress:
  1460. target.stats.show_progress(final=True)
  1461. def process_item(self, archive, target, item):
  1462. if 'chunks' in item:
  1463. self.process_chunks(archive, target, item)
  1464. target.stats.nfiles += 1
  1465. target.add_item(item)
  1466. self.print_file_status(file_status(item.mode), item.path)
  1467. def process_chunks(self, archive, target, item):
  1468. if not self.recompress and not target.recreate_rechunkify:
  1469. for chunk_id, size, csize in item.chunks:
  1470. self.cache.chunk_incref(chunk_id, target.stats)
  1471. return item.chunks
  1472. chunk_iterator = self.iter_chunks(archive, target, list(item.chunks))
  1473. compress = self.compression_decider1.decide(item.path)
  1474. chunk_processor = partial(self.chunk_processor, target, compress)
  1475. target.chunk_file(item, self.cache, target.stats, chunk_iterator, chunk_processor)
  1476. def chunk_processor(self, target, compress, data):
  1477. chunk_id = self.key.id_hash(data)
  1478. if chunk_id in self.seen_chunks:
  1479. return self.cache.chunk_incref(chunk_id, target.stats)
  1480. chunk = Chunk(data, compress=compress)
  1481. compression_spec, chunk = self.key.compression_decider2.decide(chunk)
  1482. overwrite = self.recompress
  1483. if self.recompress and not self.always_recompress and chunk_id in self.cache.chunks:
  1484. # Check if this chunk is already compressed the way we want it
  1485. old_chunk = self.key.decrypt(None, self.repository.get(chunk_id), decompress=False)
  1486. if Compressor.detect(old_chunk.data).name == compression_spec['name']:
  1487. # Stored chunk has the same compression we wanted
  1488. overwrite = False
  1489. chunk_entry = self.cache.add_chunk(chunk_id, chunk, target.stats, overwrite=overwrite)
  1490. self.seen_chunks.add(chunk_entry.id)
  1491. return chunk_entry
  1492. def iter_chunks(self, archive, target, chunks):
  1493. chunk_iterator = archive.pipeline.fetch_many([chunk_id for chunk_id, _, _ in chunks])
  1494. if target.recreate_rechunkify:
  1495. # The target.chunker will read the file contents through ChunkIteratorFileWrapper chunk-by-chunk
  1496. # (does not load the entire file into memory)
  1497. file = ChunkIteratorFileWrapper(chunk_iterator)
  1498. yield from target.chunker.chunkify(file)
  1499. else:
  1500. for chunk in chunk_iterator:
  1501. yield chunk.data
  1502. def save(self, archive, target, comment=None, replace_original=True):
  1503. if self.dry_run:
  1504. return
  1505. timestamp = archive.ts.replace(tzinfo=None)
  1506. if comment is None:
  1507. comment = archive.metadata.get('comment', '')
  1508. target.save(timestamp=timestamp, comment=comment, additional_metadata={
  1509. 'cmdline': archive.metadata.cmdline,
  1510. 'recreate_cmdline': sys.argv,
  1511. })
  1512. if replace_original:
  1513. archive.delete(Statistics(), progress=self.progress)
  1514. target.rename(archive.name)
  1515. if self.stats:
  1516. target.end = datetime.utcnow()
  1517. log_multi(DASHES,
  1518. str(target),
  1519. DASHES,
  1520. str(target.stats),
  1521. str(self.cache),
  1522. DASHES)
  1523. def matcher_add_tagged_dirs(self, archive):
  1524. """Add excludes to the matcher created by exclude_cache and exclude_if_present."""
  1525. def exclude(dir, tag_item):
  1526. if self.keep_exclude_tags:
  1527. tag_files.append(PathPrefixPattern(tag_item.path))
  1528. tagged_dirs.append(FnmatchPattern(dir + '/'))
  1529. else:
  1530. tagged_dirs.append(PathPrefixPattern(dir))
  1531. matcher = self.matcher
  1532. tag_files = []
  1533. tagged_dirs = []
  1534. # build hardlink masters, but only for paths ending in CACHE_TAG_NAME, so we can read hard-linked TAGs
  1535. cachedir_masters = {}
  1536. for item in archive.iter_items(
  1537. filter=lambda item: item.path.endswith(CACHE_TAG_NAME) or matcher.match(item.path)):
  1538. if item.path.endswith(CACHE_TAG_NAME):
  1539. cachedir_masters[item.path] = item
  1540. dir, tag_file = os.path.split(item.path)
  1541. if tag_file in self.exclude_if_present:
  1542. exclude(dir, item)
  1543. if stat.S_ISREG(item.mode):
  1544. if self.exclude_caches and tag_file == CACHE_TAG_NAME:
  1545. if 'chunks' in item:
  1546. file = open_item(archive, item)
  1547. else:
  1548. file = open_item(archive, cachedir_masters[item.source])
  1549. if file.read(len(CACHE_TAG_CONTENTS)).startswith(CACHE_TAG_CONTENTS):
  1550. exclude(dir, item)
  1551. matcher.add(tag_files, True)
  1552. matcher.add(tagged_dirs, False)
  1553. def create_target(self, archive, target_name=None):
  1554. """Create target archive."""
  1555. target_name = target_name or archive.name + '.recreate'
  1556. target = self.create_target_archive(target_name)
  1557. # If the archives use the same chunker params, then don't rechunkify
  1558. source_chunker_params = tuple(archive.metadata.get('chunker_params', []))
  1559. target.recreate_rechunkify = self.rechunkify and source_chunker_params != target.chunker_params
  1560. if target.recreate_rechunkify:
  1561. logger.debug('Rechunking archive from %s to %s', source_chunker_params or '(unknown)', target.chunker_params)
  1562. return target
  1563. def create_target_archive(self, name):
  1564. target = Archive(self.repository, self.key, self.manifest, name, create=True,
  1565. progress=self.progress, chunker_params=self.chunker_params, cache=self.cache,
  1566. checkpoint_interval=self.checkpoint_interval, compression=self.compression)
  1567. return target
  1568. def open_archive(self, name, **kwargs):
  1569. return Archive(self.repository, self.key, self.manifest, name, cache=self.cache, **kwargs)