archive.py 42 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054
  1. from binascii import hexlify
  2. from contextlib import contextmanager
  3. from datetime import datetime, timezone
  4. from getpass import getuser
  5. from itertools import groupby
  6. import errno
  7. from .logger import create_logger
  8. logger = create_logger()
  9. from .key import key_factory
  10. from .remote import cache_if_remote
  11. import os
  12. import socket
  13. import stat
  14. import sys
  15. import time
  16. from io import BytesIO
  17. from . import xattr
  18. from .helpers import Error, uid2user, user2uid, gid2group, group2gid, \
  19. parse_timestamp, to_localtime, format_time, format_timedelta, \
  20. Manifest, Statistics, decode_dict, make_path_safe, StableDict, int_to_bigint, bigint_to_int, \
  21. ProgressIndicatorPercent
  22. from .platform import acl_get, acl_set
  23. from .chunker import Chunker
  24. from .hashindex import ChunkIndex
  25. from .repository import Repository
  26. import msgpack
  27. ITEMS_BUFFER = 1024 * 1024
  28. CHUNK_MIN_EXP = 19 # 2**19 == 512kiB
  29. CHUNK_MAX_EXP = 23 # 2**23 == 8MiB
  30. HASH_WINDOW_SIZE = 0xfff # 4095B
  31. HASH_MASK_BITS = 21 # results in ~2MiB chunks statistically
  32. # defaults, use --chunker-params to override
  33. CHUNKER_PARAMS = (CHUNK_MIN_EXP, CHUNK_MAX_EXP, HASH_MASK_BITS, HASH_WINDOW_SIZE)
  34. # chunker params for the items metadata stream, finer granularity
  35. ITEMS_CHUNKER_PARAMS = (12, 16, 14, HASH_WINDOW_SIZE)
  36. has_lchmod = hasattr(os, 'lchmod')
  37. has_lchflags = hasattr(os, 'lchflags')
  38. flags_normal = os.O_RDONLY | getattr(os, 'O_BINARY', 0)
  39. flags_noatime = flags_normal | getattr(os, 'O_NOATIME', 0)
  40. def is_special(mode):
  41. # file types that get special treatment in --read-special mode
  42. return stat.S_ISBLK(mode) or stat.S_ISCHR(mode) or stat.S_ISFIFO(mode)
  43. class BackupOSError(Exception):
  44. """
  45. Wrapper for OSError raised while accessing backup files.
  46. Borg does different kinds of IO, and IO failures have different consequences.
  47. This wrapper represents failures of input file or extraction IO.
  48. These are non-critical and are only reported (exit code = 1, warning).
  49. Any unwrapped IO error is critical and aborts execution (for example repository IO failure).
  50. """
  51. def __init__(self, os_error):
  52. self.os_error = os_error
  53. self.errno = os_error.errno
  54. self.strerror = os_error.strerror
  55. self.filename = os_error.filename
  56. def __str__(self):
  57. return str(self.os_error)
  58. @contextmanager
  59. def backup_io():
  60. """Context manager changing OSError to BackupOSError."""
  61. try:
  62. yield
  63. except OSError as os_error:
  64. raise BackupOSError(os_error) from os_error
  65. def backup_io_iter(iterator):
  66. while True:
  67. try:
  68. with backup_io():
  69. item = next(iterator)
  70. except StopIteration:
  71. return
  72. yield item
  73. class DownloadPipeline:
  74. def __init__(self, repository, key):
  75. self.repository = repository
  76. self.key = key
  77. def unpack_many(self, ids, filter=None, preload=False):
  78. unpacker = msgpack.Unpacker(use_list=False)
  79. for data in self.fetch_many(ids):
  80. unpacker.feed(data)
  81. items = [decode_dict(item, (b'path', b'source', b'user', b'group')) for item in unpacker]
  82. if filter:
  83. items = [item for item in items if filter(item)]
  84. if preload:
  85. for item in items:
  86. if b'chunks' in item:
  87. self.repository.preload([c[0] for c in item[b'chunks']])
  88. for item in items:
  89. yield item
  90. def fetch_many(self, ids, is_preloaded=False):
  91. for id_, data in zip(ids, self.repository.get_many(ids, is_preloaded=is_preloaded)):
  92. yield self.key.decrypt(id_, data)
  93. class ChunkBuffer:
  94. BUFFER_SIZE = 1 * 1024 * 1024
  95. def __init__(self, key, chunker_params=ITEMS_CHUNKER_PARAMS):
  96. self.buffer = BytesIO()
  97. self.packer = msgpack.Packer(unicode_errors='surrogateescape')
  98. self.chunks = []
  99. self.key = key
  100. self.chunker = Chunker(self.key.chunk_seed, *chunker_params)
  101. def add(self, item):
  102. self.buffer.write(self.packer.pack(StableDict(item)))
  103. if self.is_full():
  104. self.flush()
  105. def write_chunk(self, chunk):
  106. raise NotImplementedError
  107. def flush(self, flush=False):
  108. if self.buffer.tell() == 0:
  109. return
  110. self.buffer.seek(0)
  111. chunks = list(bytes(s) for s in self.chunker.chunkify(self.buffer))
  112. self.buffer.seek(0)
  113. self.buffer.truncate(0)
  114. # Leave the last partial chunk in the buffer unless flush is True
  115. end = None if flush or len(chunks) == 1 else -1
  116. for chunk in chunks[:end]:
  117. self.chunks.append(self.write_chunk(chunk))
  118. if end == -1:
  119. self.buffer.write(chunks[-1])
  120. def is_full(self):
  121. return self.buffer.tell() > self.BUFFER_SIZE
  122. class CacheChunkBuffer(ChunkBuffer):
  123. def __init__(self, cache, key, stats, chunker_params=ITEMS_CHUNKER_PARAMS):
  124. super().__init__(key, chunker_params)
  125. self.cache = cache
  126. self.stats = stats
  127. def write_chunk(self, chunk):
  128. id_, _, _ = self.cache.add_chunk(self.key.id_hash(chunk), chunk, self.stats)
  129. return id_
  130. class Archive:
  131. class DoesNotExist(Error):
  132. """Archive {} does not exist"""
  133. class AlreadyExists(Error):
  134. """Archive {} already exists"""
  135. class IncompatibleFilesystemEncodingError(Error):
  136. """Failed to encode filename "{}" into file system encoding "{}". Consider configuring the LANG environment variable."""
  137. def __init__(self, repository, key, manifest, name, cache=None, create=False,
  138. checkpoint_interval=300, numeric_owner=False, progress=False,
  139. chunker_params=CHUNKER_PARAMS, start=None, end=None):
  140. self.cwd = os.getcwd()
  141. self.key = key
  142. self.repository = repository
  143. self.cache = cache
  144. self.manifest = manifest
  145. self.hard_links = {}
  146. self.stats = Statistics()
  147. self.show_progress = progress
  148. self.name = name
  149. self.checkpoint_interval = checkpoint_interval
  150. self.numeric_owner = numeric_owner
  151. if start is None:
  152. start = datetime.utcnow()
  153. self.start = start
  154. if end is None:
  155. end = datetime.utcnow()
  156. self.end = end
  157. self.pipeline = DownloadPipeline(self.repository, self.key)
  158. if create:
  159. self.items_buffer = CacheChunkBuffer(self.cache, self.key, self.stats)
  160. self.chunker = Chunker(self.key.chunk_seed, *chunker_params)
  161. if name in manifest.archives:
  162. raise self.AlreadyExists(name)
  163. self.last_checkpoint = time.time()
  164. i = 0
  165. while True:
  166. self.checkpoint_name = '%s.checkpoint%s' % (name, i and ('.%d' % i) or '')
  167. if self.checkpoint_name not in manifest.archives:
  168. break
  169. i += 1
  170. else:
  171. if name not in self.manifest.archives:
  172. raise self.DoesNotExist(name)
  173. info = self.manifest.archives[name]
  174. self.load(info[b'id'])
  175. self.zeros = b'\0' * (1 << chunker_params[1])
  176. def _load_meta(self, id):
  177. data = self.key.decrypt(id, self.repository.get(id))
  178. metadata = msgpack.unpackb(data)
  179. if metadata[b'version'] != 1:
  180. raise Exception('Unknown archive metadata version')
  181. return metadata
  182. def load(self, id):
  183. self.id = id
  184. self.metadata = self._load_meta(self.id)
  185. decode_dict(self.metadata, (b'name', b'hostname', b'username', b'time', b'time_end'))
  186. self.metadata[b'cmdline'] = [arg.decode('utf-8', 'surrogateescape') for arg in self.metadata[b'cmdline']]
  187. self.name = self.metadata[b'name']
  188. @property
  189. def ts(self):
  190. """Timestamp of archive creation (start) in UTC"""
  191. ts = self.metadata[b'time']
  192. return parse_timestamp(ts)
  193. @property
  194. def ts_end(self):
  195. """Timestamp of archive creation (end) in UTC"""
  196. # fall back to time if there is no time_end present in metadata
  197. ts = self.metadata.get(b'time_end') or self.metadata[b'time']
  198. return parse_timestamp(ts)
  199. @property
  200. def fpr(self):
  201. return hexlify(self.id).decode('ascii')
  202. @property
  203. def duration(self):
  204. return format_timedelta(self.end - self.start)
  205. def __str__(self):
  206. return '''\
  207. Archive name: {0.name}
  208. Archive fingerprint: {0.fpr}
  209. Time (start): {start}
  210. Time (end): {end}
  211. Duration: {0.duration}
  212. Number of files: {0.stats.nfiles}'''.format(
  213. self,
  214. start=format_time(to_localtime(self.start.replace(tzinfo=timezone.utc))),
  215. end=format_time(to_localtime(self.end.replace(tzinfo=timezone.utc))))
  216. def __repr__(self):
  217. return 'Archive(%r)' % self.name
  218. def iter_items(self, filter=None, preload=False):
  219. for item in self.pipeline.unpack_many(self.metadata[b'items'], filter=filter, preload=preload):
  220. yield item
  221. def add_item(self, item):
  222. unknown_keys = set(item) - self.manifest.item_keys
  223. assert not unknown_keys, ('unknown item metadata keys detected, please update ITEM_KEYS: %s',
  224. ','.join(k.decode('ascii') for k in unknown_keys))
  225. if self.show_progress:
  226. self.stats.show_progress(item=item, dt=0.2)
  227. self.items_buffer.add(item)
  228. if time.time() - self.last_checkpoint > self.checkpoint_interval:
  229. self.write_checkpoint()
  230. self.last_checkpoint = time.time()
  231. def write_checkpoint(self):
  232. self.save(self.checkpoint_name)
  233. del self.manifest.archives[self.checkpoint_name]
  234. self.cache.chunk_decref(self.id, self.stats)
  235. def save(self, name=None, timestamp=None):
  236. name = name or self.name
  237. if name in self.manifest.archives:
  238. raise self.AlreadyExists(name)
  239. self.items_buffer.flush(flush=True)
  240. if timestamp is None:
  241. self.end = datetime.utcnow()
  242. start = self.start
  243. end = self.end
  244. else:
  245. self.end = timestamp
  246. start = timestamp
  247. end = timestamp # we only have 1 value
  248. metadata = StableDict({
  249. 'version': 1,
  250. 'name': name,
  251. 'items': self.items_buffer.chunks,
  252. 'cmdline': sys.argv,
  253. 'hostname': socket.gethostname(),
  254. 'username': getuser(),
  255. 'time': start.isoformat(),
  256. 'time_end': end.isoformat(),
  257. })
  258. data = msgpack.packb(metadata, unicode_errors='surrogateescape')
  259. self.id = self.key.id_hash(data)
  260. self.cache.add_chunk(self.id, data, self.stats)
  261. self.manifest.archives[name] = {'id': self.id, 'time': metadata['time']}
  262. self.manifest.write()
  263. self.repository.commit()
  264. self.cache.commit()
  265. def calc_stats(self, cache):
  266. def add(id):
  267. count, size, csize = cache.chunks[id]
  268. stats.update(size, csize, count == 1)
  269. cache.chunks[id] = count - 1, size, csize
  270. def add_file_chunks(chunks):
  271. for id, _, _ in chunks:
  272. add(id)
  273. # This function is a bit evil since it abuses the cache to calculate
  274. # the stats. The cache transaction must be rolled back afterwards
  275. unpacker = msgpack.Unpacker(use_list=False)
  276. cache.begin_txn()
  277. stats = Statistics()
  278. add(self.id)
  279. for id, chunk in zip(self.metadata[b'items'], self.repository.get_many(self.metadata[b'items'])):
  280. add(id)
  281. unpacker.feed(self.key.decrypt(id, chunk))
  282. for item in unpacker:
  283. if b'chunks' in item:
  284. stats.nfiles += 1
  285. add_file_chunks(item[b'chunks'])
  286. cache.rollback()
  287. return stats
  288. def extract_item(self, item, restore_attrs=True, dry_run=False, stdout=False, sparse=False):
  289. if dry_run or stdout:
  290. if b'chunks' in item:
  291. for data in self.pipeline.fetch_many([c[0] for c in item[b'chunks']], is_preloaded=True):
  292. if stdout:
  293. sys.stdout.buffer.write(data)
  294. if stdout:
  295. sys.stdout.buffer.flush()
  296. return
  297. dest = self.cwd
  298. if item[b'path'].startswith('/') or item[b'path'].startswith('..'):
  299. raise Exception('Path should be relative and local')
  300. path = os.path.join(dest, item[b'path'])
  301. # Attempt to remove existing files, ignore errors on failure
  302. try:
  303. st = os.lstat(path)
  304. if stat.S_ISDIR(st.st_mode):
  305. os.rmdir(path)
  306. else:
  307. os.unlink(path)
  308. except UnicodeEncodeError:
  309. raise self.IncompatibleFilesystemEncodingError(path, sys.getfilesystemencoding()) from None
  310. except OSError:
  311. pass
  312. mode = item[b'mode']
  313. if stat.S_ISREG(mode):
  314. if not os.path.exists(os.path.dirname(path)):
  315. with backup_io():
  316. os.makedirs(os.path.dirname(path))
  317. # Hard link?
  318. if b'source' in item:
  319. source = os.path.join(dest, item[b'source'])
  320. with backup_io():
  321. if os.path.exists(path):
  322. os.unlink(path)
  323. os.link(source, path)
  324. else:
  325. with backup_io():
  326. fd = open(path, 'wb')
  327. with fd:
  328. ids = [c[0] for c in item[b'chunks']]
  329. for data in self.pipeline.fetch_many(ids, is_preloaded=True):
  330. with backup_io():
  331. if sparse and self.zeros.startswith(data):
  332. # all-zero chunk: create a hole in a sparse file
  333. fd.seek(len(data), 1)
  334. else:
  335. fd.write(data)
  336. with backup_io():
  337. pos = fd.tell()
  338. fd.truncate(pos)
  339. fd.flush()
  340. self.restore_attrs(path, item, fd=fd.fileno())
  341. return
  342. with backup_io():
  343. # No repository access beyond this point.
  344. if stat.S_ISDIR(mode):
  345. if not os.path.exists(path):
  346. os.makedirs(path)
  347. if restore_attrs:
  348. self.restore_attrs(path, item)
  349. elif stat.S_ISLNK(mode):
  350. if not os.path.exists(os.path.dirname(path)):
  351. os.makedirs(os.path.dirname(path))
  352. source = item[b'source']
  353. if os.path.exists(path):
  354. os.unlink(path)
  355. try:
  356. os.symlink(source, path)
  357. except UnicodeEncodeError:
  358. raise self.IncompatibleFilesystemEncodingError(source, sys.getfilesystemencoding()) from None
  359. self.restore_attrs(path, item, symlink=True)
  360. elif stat.S_ISFIFO(mode):
  361. if not os.path.exists(os.path.dirname(path)):
  362. os.makedirs(os.path.dirname(path))
  363. os.mkfifo(path)
  364. self.restore_attrs(path, item)
  365. elif stat.S_ISCHR(mode) or stat.S_ISBLK(mode):
  366. os.mknod(path, item[b'mode'], item[b'rdev'])
  367. self.restore_attrs(path, item)
  368. else:
  369. raise Exception('Unknown archive item type %r' % item[b'mode'])
  370. def restore_attrs(self, path, item, symlink=False, fd=None):
  371. """
  372. Restore filesystem attributes on *path* (*fd*) from *item*.
  373. Does not access the repository.
  374. """
  375. uid = gid = None
  376. if not self.numeric_owner:
  377. uid = user2uid(item[b'user'])
  378. gid = group2gid(item[b'group'])
  379. uid = item[b'uid'] if uid is None else uid
  380. gid = item[b'gid'] if gid is None else gid
  381. # This code is a bit of a mess due to os specific differences
  382. try:
  383. if fd:
  384. os.fchown(fd, uid, gid)
  385. else:
  386. os.lchown(path, uid, gid)
  387. except OSError:
  388. pass
  389. if fd:
  390. os.fchmod(fd, item[b'mode'])
  391. elif not symlink:
  392. os.chmod(path, item[b'mode'])
  393. elif has_lchmod: # Not available on Linux
  394. os.lchmod(path, item[b'mode'])
  395. mtime = bigint_to_int(item[b'mtime'])
  396. if b'atime' in item:
  397. atime = bigint_to_int(item[b'atime'])
  398. else:
  399. # old archives only had mtime in item metadata
  400. atime = mtime
  401. if fd:
  402. os.utime(fd, None, ns=(atime, mtime))
  403. else:
  404. os.utime(path, None, ns=(atime, mtime), follow_symlinks=False)
  405. acl_set(path, item, self.numeric_owner)
  406. # Only available on OS X and FreeBSD
  407. if has_lchflags and b'bsdflags' in item:
  408. try:
  409. os.lchflags(path, item[b'bsdflags'])
  410. except OSError:
  411. pass
  412. # chown removes Linux capabilities, so set the extended attributes at the end, after chown, since they include
  413. # the Linux capabilities in the "security.capability" attribute.
  414. xattrs = item.get(b'xattrs', {})
  415. for k, v in xattrs.items():
  416. try:
  417. xattr.setxattr(fd or path, k, v, follow_symlinks=False)
  418. except OSError as e:
  419. if e.errno not in (errno.ENOTSUP, errno.EACCES):
  420. # only raise if the errno is not on our ignore list:
  421. # ENOTSUP == xattrs not supported here
  422. # EACCES == permission denied to set this specific xattr
  423. # (this may happen related to security.* keys)
  424. raise
  425. def rename(self, name):
  426. if name in self.manifest.archives:
  427. raise self.AlreadyExists(name)
  428. metadata = StableDict(self._load_meta(self.id))
  429. metadata[b'name'] = name
  430. data = msgpack.packb(metadata, unicode_errors='surrogateescape')
  431. new_id = self.key.id_hash(data)
  432. self.cache.add_chunk(new_id, data, self.stats)
  433. self.manifest.archives[name] = {'id': new_id, 'time': metadata[b'time']}
  434. self.cache.chunk_decref(self.id, self.stats)
  435. del self.manifest.archives[self.name]
  436. def delete(self, stats, progress=False, forced=False):
  437. class ChunksIndexError(Error):
  438. """Chunk ID {} missing from chunks index, corrupted chunks index - aborting transaction."""
  439. def chunk_decref(id, stats):
  440. nonlocal error
  441. try:
  442. self.cache.chunk_decref(id, stats)
  443. except KeyError:
  444. cid = hexlify(id).decode('ascii')
  445. raise ChunksIndexError(cid)
  446. except Repository.ObjectNotFound as e:
  447. # object not in repo - strange, but we wanted to delete it anyway.
  448. if not forced:
  449. raise
  450. error = True
  451. error = False
  452. try:
  453. unpacker = msgpack.Unpacker(use_list=False)
  454. items_ids = self.metadata[b'items']
  455. pi = ProgressIndicatorPercent(total=len(items_ids), msg="Decrementing references %3.0f%%", same_line=True)
  456. for (i, (items_id, data)) in enumerate(zip(items_ids, self.repository.get_many(items_ids))):
  457. if progress:
  458. pi.show(i)
  459. unpacker.feed(self.key.decrypt(items_id, data))
  460. chunk_decref(items_id, stats)
  461. try:
  462. for item in unpacker:
  463. if b'chunks' in item:
  464. for chunk_id, size, csize in item[b'chunks']:
  465. chunk_decref(chunk_id, stats)
  466. except (TypeError, ValueError):
  467. # if items metadata spans multiple chunks and one chunk got dropped somehow,
  468. # it could be that unpacker yields bad types
  469. if not forced:
  470. raise
  471. error = True
  472. if progress:
  473. pi.finish()
  474. except (msgpack.UnpackException, Repository.ObjectNotFound):
  475. # items metadata corrupted
  476. if not forced:
  477. raise
  478. error = True
  479. # in forced delete mode, we try hard to delete at least the manifest entry,
  480. # if possible also the archive superblock, even if processing the items raises
  481. # some harmless exception.
  482. chunk_decref(self.id, stats)
  483. del self.manifest.archives[self.name]
  484. if error:
  485. logger.warning('forced deletion succeeded, but the deleted archive was corrupted.')
  486. logger.warning('borg check --repair is required to free all space.')
  487. def stat_attrs(self, st, path):
  488. item = {
  489. b'mode': st.st_mode,
  490. b'uid': st.st_uid, b'user': uid2user(st.st_uid),
  491. b'gid': st.st_gid, b'group': gid2group(st.st_gid),
  492. b'atime': int_to_bigint(st.st_atime_ns),
  493. b'ctime': int_to_bigint(st.st_ctime_ns),
  494. b'mtime': int_to_bigint(st.st_mtime_ns),
  495. }
  496. if self.numeric_owner:
  497. item[b'user'] = item[b'group'] = None
  498. with backup_io():
  499. xattrs = xattr.get_all(path, follow_symlinks=False)
  500. if xattrs:
  501. item[b'xattrs'] = StableDict(xattrs)
  502. if has_lchflags and st.st_flags:
  503. item[b'bsdflags'] = st.st_flags
  504. with backup_io():
  505. acl_get(path, item, st, self.numeric_owner)
  506. return item
  507. def process_dir(self, path, st):
  508. item = {b'path': make_path_safe(path)}
  509. item.update(self.stat_attrs(st, path))
  510. self.add_item(item)
  511. return 'd' # directory
  512. def process_fifo(self, path, st):
  513. item = {b'path': make_path_safe(path)}
  514. item.update(self.stat_attrs(st, path))
  515. self.add_item(item)
  516. return 'f' # fifo
  517. def process_dev(self, path, st):
  518. item = {b'path': make_path_safe(path), b'rdev': st.st_rdev}
  519. item.update(self.stat_attrs(st, path))
  520. self.add_item(item)
  521. if stat.S_ISCHR(st.st_mode):
  522. return 'c' # char device
  523. elif stat.S_ISBLK(st.st_mode):
  524. return 'b' # block device
  525. def process_symlink(self, path, st):
  526. source = os.readlink(path)
  527. item = {b'path': make_path_safe(path), b'source': source}
  528. item.update(self.stat_attrs(st, path))
  529. self.add_item(item)
  530. return 's' # symlink
  531. def process_stdin(self, path, cache):
  532. uid, gid = 0, 0
  533. fd = sys.stdin.buffer # binary
  534. chunks = []
  535. for chunk in backup_io_iter(self.chunker.chunkify(fd)):
  536. chunks.append(cache.add_chunk(self.key.id_hash(chunk), chunk, self.stats))
  537. self.stats.nfiles += 1
  538. t = int_to_bigint(int(time.time()) * 1000000000)
  539. item = {
  540. b'path': path,
  541. b'chunks': chunks,
  542. b'mode': 0o100660, # regular file, ug=rw
  543. b'uid': uid, b'user': uid2user(uid),
  544. b'gid': gid, b'group': gid2group(gid),
  545. b'mtime': t, b'atime': t, b'ctime': t,
  546. }
  547. self.add_item(item)
  548. return 'i' # stdin
  549. def process_file(self, path, st, cache, ignore_inode=False):
  550. status = None
  551. safe_path = make_path_safe(path)
  552. # Is it a hard link?
  553. if st.st_nlink > 1:
  554. source = self.hard_links.get((st.st_ino, st.st_dev))
  555. if (st.st_ino, st.st_dev) in self.hard_links:
  556. item = self.stat_attrs(st, path)
  557. item.update({b'path': safe_path, b'source': source})
  558. self.add_item(item)
  559. status = 'h' # regular file, hardlink (to already seen inodes)
  560. return status
  561. else:
  562. self.hard_links[st.st_ino, st.st_dev] = safe_path
  563. is_special_file = is_special(st.st_mode)
  564. if not is_special_file:
  565. path_hash = self.key.id_hash(os.path.join(self.cwd, path).encode('utf-8', 'surrogateescape'))
  566. ids = cache.file_known_and_unchanged(path_hash, st, ignore_inode)
  567. else:
  568. # in --read-special mode, we may be called for special files.
  569. # there should be no information in the cache about special files processed in
  570. # read-special mode, but we better play safe as this was wrong in the past:
  571. path_hash = ids = None
  572. first_run = not cache.files
  573. if first_run:
  574. logger.debug('Processing files ...')
  575. chunks = None
  576. if ids is not None:
  577. # Make sure all ids are available
  578. for id_ in ids:
  579. if not cache.seen_chunk(id_):
  580. break
  581. else:
  582. chunks = [cache.chunk_incref(id_, self.stats) for id_ in ids]
  583. status = 'U' # regular file, unchanged
  584. else:
  585. status = 'A' # regular file, added
  586. item = {b'path': safe_path}
  587. # Only chunkify the file if needed
  588. if chunks is None:
  589. with backup_io():
  590. fh = Archive._open_rb(path)
  591. with os.fdopen(fh, 'rb') as fd:
  592. chunks = []
  593. for chunk in backup_io_iter(self.chunker.chunkify(fd, fh)):
  594. chunks.append(cache.add_chunk(self.key.id_hash(chunk), chunk, self.stats))
  595. if self.show_progress:
  596. self.stats.show_progress(item=item, dt=0.2)
  597. if not is_special_file:
  598. # we must not memorize special files, because the contents of e.g. a
  599. # block or char device will change without its mtime/size/inode changing.
  600. cache.memorize_file(path_hash, st, [c[0] for c in chunks])
  601. status = status or 'M' # regular file, modified (if not 'A' already)
  602. item[b'chunks'] = chunks
  603. item.update(self.stat_attrs(st, path))
  604. if is_special_file:
  605. # we processed a special file like a regular file. reflect that in mode,
  606. # so it can be extracted / accessed in FUSE mount like a regular file:
  607. item[b'mode'] = stat.S_IFREG | stat.S_IMODE(item[b'mode'])
  608. self.stats.nfiles += 1
  609. self.add_item(item)
  610. return status
  611. @staticmethod
  612. def list_archives(repository, key, manifest, cache=None):
  613. # expensive! see also Manifest.list_archive_infos.
  614. for name, info in manifest.archives.items():
  615. yield Archive(repository, key, manifest, name, cache=cache)
  616. @staticmethod
  617. def _open_rb(path):
  618. try:
  619. # if we have O_NOATIME, this likely will succeed if we are root or owner of file:
  620. return os.open(path, flags_noatime)
  621. except PermissionError:
  622. if flags_noatime == flags_normal:
  623. # we do not have O_NOATIME, no need to try again:
  624. raise
  625. # Was this EPERM due to the O_NOATIME flag? Try again without it:
  626. return os.open(path, flags_normal)
  627. # this set must be kept complete, otherwise the RobustUnpacker might malfunction:
  628. ITEM_KEYS = frozenset([b'path', b'source', b'rdev', b'chunks',
  629. b'mode', b'user', b'group', b'uid', b'gid', b'mtime', b'atime', b'ctime',
  630. b'xattrs', b'bsdflags', b'acl_nfs4', b'acl_access', b'acl_default', b'acl_extended', ])
  631. # this is the set of keys that are always present in items:
  632. REQUIRED_ITEM_KEYS = frozenset([b'path', b'mtime', ])
  633. # this set must be kept complete, otherwise rebuild_manifest might malfunction:
  634. ARCHIVE_KEYS = frozenset([b'version', b'name', b'items', b'cmdline', b'hostname', b'username', b'time', b'time_end', ])
  635. # this is the set of keys that are always present in archives:
  636. REQUIRED_ARCHIVE_KEYS = frozenset([b'version', b'name', b'items', b'cmdline', b'time', ])
  637. def valid_msgpacked_dict(d, keys_serialized):
  638. """check if the data <d> looks like a msgpacked dict"""
  639. d_len = len(d)
  640. if d_len == 0:
  641. return False
  642. if d[0] & 0xf0 == 0x80: # object is a fixmap (up to 15 elements)
  643. offs = 1
  644. elif d[0] == 0xde: # object is a map16 (up to 2^16-1 elements)
  645. offs = 3
  646. else:
  647. # object is not a map (dict)
  648. # note: we must not have dicts with > 2^16-1 elements
  649. return False
  650. if d_len <= offs:
  651. return False
  652. # is the first dict key a bytestring?
  653. if d[offs] & 0xe0 == 0xa0: # key is a small bytestring (up to 31 chars)
  654. pass
  655. elif d[offs] in (0xd9, 0xda, 0xdb): # key is a str8, str16 or str32
  656. pass
  657. else:
  658. # key is not a bytestring
  659. return False
  660. # is the bytestring any of the expected key names?
  661. key_serialized = d[offs:]
  662. return any(key_serialized.startswith(pattern) for pattern in keys_serialized)
  663. class RobustUnpacker:
  664. """A restartable/robust version of the streaming msgpack unpacker
  665. """
  666. def __init__(self, validator, item_keys):
  667. super().__init__()
  668. self.item_keys = [msgpack.packb(name) for name in item_keys]
  669. self.validator = validator
  670. self._buffered_data = []
  671. self._resync = False
  672. self._unpacker = msgpack.Unpacker(object_hook=StableDict)
  673. def resync(self):
  674. self._buffered_data = []
  675. self._resync = True
  676. def feed(self, data):
  677. if self._resync:
  678. self._buffered_data.append(data)
  679. else:
  680. self._unpacker.feed(data)
  681. def __iter__(self):
  682. return self
  683. def __next__(self):
  684. if self._resync:
  685. data = b''.join(self._buffered_data)
  686. while self._resync:
  687. if not data:
  688. raise StopIteration
  689. # Abort early if the data does not look like a serialized item dict
  690. if not valid_msgpacked_dict(data, self.item_keys):
  691. data = data[1:]
  692. continue
  693. self._unpacker = msgpack.Unpacker(object_hook=StableDict)
  694. self._unpacker.feed(data)
  695. try:
  696. item = next(self._unpacker)
  697. if self.validator(item):
  698. self._resync = False
  699. return item
  700. # Ignore exceptions that might be raised when feeding
  701. # msgpack with invalid data
  702. except (TypeError, ValueError, StopIteration):
  703. pass
  704. data = data[1:]
  705. else:
  706. return next(self._unpacker)
  707. class ArchiveChecker:
  708. def __init__(self):
  709. self.error_found = False
  710. self.possibly_superseded = set()
  711. def check(self, repository, repair=False, archive=None, last=None, prefix=None, save_space=False):
  712. logger.info('Starting archive consistency check...')
  713. self.check_all = archive is None and last is None and prefix is None
  714. self.repair = repair
  715. self.repository = repository
  716. self.init_chunks()
  717. self.key = self.identify_key(repository)
  718. if Manifest.MANIFEST_ID not in self.chunks:
  719. logger.error("Repository manifest not found!")
  720. self.error_found = True
  721. self.manifest = self.rebuild_manifest()
  722. else:
  723. self.manifest, _ = Manifest.load(repository, key=self.key)
  724. self.rebuild_refcounts(archive=archive, last=last, prefix=prefix)
  725. self.orphan_chunks_check()
  726. self.finish(save_space=save_space)
  727. if self.error_found:
  728. logger.error('Archive consistency check complete, problems found.')
  729. else:
  730. logger.info('Archive consistency check complete, no problems found.')
  731. return self.repair or not self.error_found
  732. def init_chunks(self):
  733. """Fetch a list of all object keys from repository
  734. """
  735. # Explicitly set the initial hash table capacity to avoid performance issues
  736. # due to hash table "resonance"
  737. capacity = int(len(self.repository) * 1.2)
  738. self.chunks = ChunkIndex(capacity)
  739. marker = None
  740. while True:
  741. result = self.repository.list(limit=10000, marker=marker)
  742. if not result:
  743. break
  744. marker = result[-1]
  745. for id_ in result:
  746. self.chunks[id_] = (0, 0, 0)
  747. def identify_key(self, repository):
  748. try:
  749. some_chunkid, _ = next(self.chunks.iteritems())
  750. except StopIteration:
  751. # repo is completely empty, no chunks
  752. return None
  753. cdata = repository.get(some_chunkid)
  754. return key_factory(repository, cdata)
  755. def rebuild_manifest(self):
  756. """Rebuild the manifest object if it is missing
  757. Iterates through all objects in the repository looking for archive metadata blocks.
  758. """
  759. def valid_archive(obj):
  760. if not isinstance(obj, dict):
  761. return False
  762. keys = set(obj)
  763. return REQUIRED_ARCHIVE_KEYS.issubset(keys)
  764. logger.info('Rebuilding missing manifest, this might take some time...')
  765. # as we have lost the manifest, we do not know any more what valid item keys we had.
  766. # collecting any key we encounter in a damaged repo seems unwise, thus we just use
  767. # the hardcoded list from the source code. thus, it is not recommended to rebuild a
  768. # lost manifest on a older borg version than the most recent one that was ever used
  769. # within this repository (assuming that newer borg versions support more item keys).
  770. manifest = Manifest(self.key, self.repository)
  771. archive_keys_serialized = [msgpack.packb(name) for name in ARCHIVE_KEYS]
  772. for chunk_id, _ in self.chunks.iteritems():
  773. cdata = self.repository.get(chunk_id)
  774. data = self.key.decrypt(chunk_id, cdata)
  775. if not valid_msgpacked_dict(data, archive_keys_serialized):
  776. continue
  777. if b'cmdline' not in data or b'\xa7version\x01' not in data:
  778. continue
  779. try:
  780. archive = msgpack.unpackb(data)
  781. # Ignore exceptions that might be raised when feeding
  782. # msgpack with invalid data
  783. except (TypeError, ValueError, StopIteration):
  784. continue
  785. if valid_archive(archive):
  786. logger.info('Found archive %s', archive[b'name'].decode('utf-8'))
  787. manifest.archives[archive[b'name'].decode('utf-8')] = {b'id': chunk_id, b'time': archive[b'time']}
  788. logger.info('Manifest rebuild complete.')
  789. return manifest
  790. def rebuild_refcounts(self, archive=None, last=None, prefix=None):
  791. """Rebuild object reference counts by walking the metadata
  792. Missing and/or incorrect data is repaired when detected
  793. """
  794. # Exclude the manifest from chunks
  795. del self.chunks[Manifest.MANIFEST_ID]
  796. def mark_as_possibly_superseded(id_):
  797. if self.chunks.get(id_, (0,))[0] == 0:
  798. self.possibly_superseded.add(id_)
  799. def add_callback(chunk):
  800. id_ = self.key.id_hash(chunk)
  801. cdata = self.key.encrypt(chunk)
  802. add_reference(id_, len(chunk), len(cdata), cdata)
  803. return id_
  804. def add_reference(id_, size, csize, cdata=None):
  805. try:
  806. self.chunks.incref(id_)
  807. except KeyError:
  808. assert cdata is not None
  809. self.chunks[id_] = 1, size, csize
  810. if self.repair:
  811. self.repository.put(id_, cdata)
  812. def verify_file_chunks(item):
  813. """Verifies that all file chunks are present
  814. Missing file chunks will be replaced with new chunks of the same
  815. length containing all zeros.
  816. """
  817. offset = 0
  818. chunk_list = []
  819. for chunk_id, size, csize in item[b'chunks']:
  820. if chunk_id not in self.chunks:
  821. # If a file chunk is missing, create an all empty replacement chunk
  822. logger.error('{}: Missing file chunk detected (Byte {}-{})'.format(item[b'path'].decode('utf-8', 'surrogateescape'), offset, offset + size))
  823. self.error_found = True
  824. data = bytes(size)
  825. chunk_id = self.key.id_hash(data)
  826. cdata = self.key.encrypt(data)
  827. csize = len(cdata)
  828. add_reference(chunk_id, size, csize, cdata)
  829. else:
  830. add_reference(chunk_id, size, csize)
  831. chunk_list.append((chunk_id, size, csize))
  832. offset += size
  833. item[b'chunks'] = chunk_list
  834. def robust_iterator(archive):
  835. """Iterates through all archive items
  836. Missing item chunks will be skipped and the msgpack stream will be restarted
  837. """
  838. item_keys = self.manifest.item_keys
  839. unpacker = RobustUnpacker(lambda item: isinstance(item, dict) and b'path' in item, item_keys)
  840. _state = 0
  841. def missing_chunk_detector(chunk_id):
  842. nonlocal _state
  843. if _state % 2 != int(chunk_id not in self.chunks):
  844. _state += 1
  845. return _state
  846. def report(msg, chunk_id, chunk_no):
  847. cid = hexlify(chunk_id).decode('ascii')
  848. msg += ' [chunk: %06d_%s]' % (chunk_no, cid) # see debug-dump-archive-items
  849. self.error_found = True
  850. logger.error(msg)
  851. def valid_item(obj):
  852. if not isinstance(obj, StableDict):
  853. return False
  854. keys = set(obj)
  855. return REQUIRED_ITEM_KEYS.issubset(keys) and keys.issubset(item_keys)
  856. i = 0
  857. for state, items in groupby(archive[b'items'], missing_chunk_detector):
  858. items = list(items)
  859. if state % 2:
  860. for chunk_id in items:
  861. report('item metadata chunk missing', chunk_id, i)
  862. i += 1
  863. continue
  864. if state > 0:
  865. unpacker.resync()
  866. for chunk_id, cdata in zip(items, repository.get_many(items)):
  867. unpacker.feed(self.key.decrypt(chunk_id, cdata))
  868. try:
  869. for item in unpacker:
  870. if valid_item(item):
  871. yield item
  872. else:
  873. report('Did not get expected metadata dict when unpacking item metadata', chunk_id, i)
  874. except Exception:
  875. report('Exception while unpacking item metadata', chunk_id, i)
  876. raise
  877. i += 1
  878. if archive is None:
  879. # we need last N or all archives
  880. archive_items = sorted(self.manifest.archives.items(), reverse=True,
  881. key=lambda name_info: name_info[1][b'time'])
  882. if prefix is not None:
  883. archive_items = [item for item in archive_items if item[0].startswith(prefix)]
  884. num_archives = len(archive_items)
  885. end = None if last is None else min(num_archives, last)
  886. else:
  887. # we only want one specific archive
  888. archive_items = [item for item in self.manifest.archives.items() if item[0] == archive]
  889. num_archives = 1
  890. end = 1
  891. with cache_if_remote(self.repository) as repository:
  892. for i, (name, info) in enumerate(archive_items[:end]):
  893. logger.info('Analyzing archive {} ({}/{})'.format(name, num_archives - i, num_archives))
  894. archive_id = info[b'id']
  895. if archive_id not in self.chunks:
  896. logger.error('Archive metadata block is missing!')
  897. self.error_found = True
  898. del self.manifest.archives[name]
  899. continue
  900. mark_as_possibly_superseded(archive_id)
  901. cdata = self.repository.get(archive_id)
  902. data = self.key.decrypt(archive_id, cdata)
  903. archive = StableDict(msgpack.unpackb(data))
  904. if archive[b'version'] != 1:
  905. raise Exception('Unknown archive metadata version')
  906. decode_dict(archive, (b'name', b'hostname', b'username', b'time', b'time_end'))
  907. archive[b'cmdline'] = [arg.decode('utf-8', 'surrogateescape') for arg in archive[b'cmdline']]
  908. items_buffer = ChunkBuffer(self.key)
  909. items_buffer.write_chunk = add_callback
  910. for item in robust_iterator(archive):
  911. if b'chunks' in item:
  912. verify_file_chunks(item)
  913. items_buffer.add(item)
  914. items_buffer.flush(flush=True)
  915. for previous_item_id in archive[b'items']:
  916. mark_as_possibly_superseded(previous_item_id)
  917. archive[b'items'] = items_buffer.chunks
  918. data = msgpack.packb(archive, unicode_errors='surrogateescape')
  919. new_archive_id = self.key.id_hash(data)
  920. cdata = self.key.encrypt(data)
  921. add_reference(new_archive_id, len(data), len(cdata), cdata)
  922. info[b'id'] = new_archive_id
  923. def orphan_chunks_check(self):
  924. if self.check_all:
  925. unused = set()
  926. for id_, (count, size, csize) in self.chunks.iteritems():
  927. if count == 0:
  928. unused.add(id_)
  929. orphaned = unused - self.possibly_superseded
  930. if orphaned:
  931. logger.error('{} orphaned objects found!'.format(len(orphaned)))
  932. self.error_found = True
  933. if self.repair:
  934. for id_ in unused:
  935. self.repository.delete(id_)
  936. else:
  937. logger.info('Orphaned objects check skipped (needs all archives checked).')
  938. def finish(self, save_space=False):
  939. if self.repair:
  940. self.manifest.write()
  941. self.repository.commit(save_space=save_space)