2
0

archive.py 40 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011
  1. from binascii import hexlify
  2. from contextlib import contextmanager
  3. from datetime import datetime, timezone
  4. from getpass import getuser
  5. from itertools import groupby
  6. import errno
  7. from .logger import create_logger
  8. logger = create_logger()
  9. from .key import key_factory
  10. from .remote import cache_if_remote
  11. import os
  12. import socket
  13. import stat
  14. import sys
  15. import time
  16. from io import BytesIO
  17. from . import xattr
  18. from .helpers import Error, uid2user, user2uid, gid2group, group2gid, \
  19. parse_timestamp, to_localtime, format_time, format_timedelta, \
  20. Manifest, Statistics, decode_dict, make_path_safe, StableDict, int_to_bigint, bigint_to_int, \
  21. ProgressIndicatorPercent
  22. from .platform import acl_get, acl_set
  23. from .chunker import Chunker
  24. from .hashindex import ChunkIndex
  25. import msgpack
  26. ITEMS_BUFFER = 1024 * 1024
  27. CHUNK_MIN_EXP = 19 # 2**19 == 512kiB
  28. CHUNK_MAX_EXP = 23 # 2**23 == 8MiB
  29. HASH_WINDOW_SIZE = 0xfff # 4095B
  30. HASH_MASK_BITS = 21 # results in ~2MiB chunks statistically
  31. # defaults, use --chunker-params to override
  32. CHUNKER_PARAMS = (CHUNK_MIN_EXP, CHUNK_MAX_EXP, HASH_MASK_BITS, HASH_WINDOW_SIZE)
  33. # chunker params for the items metadata stream, finer granularity
  34. ITEMS_CHUNKER_PARAMS = (12, 16, 14, HASH_WINDOW_SIZE)
  35. has_lchmod = hasattr(os, 'lchmod')
  36. has_lchflags = hasattr(os, 'lchflags')
  37. flags_normal = os.O_RDONLY | getattr(os, 'O_BINARY', 0)
  38. flags_noatime = flags_normal | getattr(os, 'O_NOATIME', 0)
  39. class BackupOSError(Exception):
  40. """
  41. Wrapper for OSError raised while accessing backup files.
  42. Borg does different kinds of IO, and IO failures have different consequences.
  43. This wrapper represents failures of input file or extraction IO.
  44. These are non-critical and are only reported (exit code = 1, warning).
  45. Any unwrapped IO error is critical and aborts execution (for example repository IO failure).
  46. """
  47. def __init__(self, os_error):
  48. self.os_error = os_error
  49. self.errno = os_error.errno
  50. self.strerror = os_error.strerror
  51. self.filename = os_error.filename
  52. def __str__(self):
  53. return str(self.os_error)
  54. @contextmanager
  55. def backup_io():
  56. """Context manager changing OSError to BackupOSError."""
  57. try:
  58. yield
  59. except OSError as os_error:
  60. raise BackupOSError(os_error) from os_error
  61. def backup_io_iter(iterator):
  62. while True:
  63. try:
  64. with backup_io():
  65. item = next(iterator)
  66. except StopIteration:
  67. return
  68. yield item
  69. class DownloadPipeline:
  70. def __init__(self, repository, key):
  71. self.repository = repository
  72. self.key = key
  73. def unpack_many(self, ids, filter=None, preload=False):
  74. unpacker = msgpack.Unpacker(use_list=False)
  75. for data in self.fetch_many(ids):
  76. unpacker.feed(data)
  77. items = [decode_dict(item, (b'path', b'source', b'user', b'group')) for item in unpacker]
  78. if filter:
  79. items = [item for item in items if filter(item)]
  80. if preload:
  81. for item in items:
  82. if b'chunks' in item:
  83. self.repository.preload([c[0] for c in item[b'chunks']])
  84. for item in items:
  85. yield item
  86. def fetch_many(self, ids, is_preloaded=False):
  87. for id_, data in zip(ids, self.repository.get_many(ids, is_preloaded=is_preloaded)):
  88. yield self.key.decrypt(id_, data)
  89. class ChunkBuffer:
  90. BUFFER_SIZE = 1 * 1024 * 1024
  91. def __init__(self, key, chunker_params=ITEMS_CHUNKER_PARAMS):
  92. self.buffer = BytesIO()
  93. self.packer = msgpack.Packer(unicode_errors='surrogateescape')
  94. self.chunks = []
  95. self.key = key
  96. self.chunker = Chunker(self.key.chunk_seed, *chunker_params)
  97. def add(self, item):
  98. self.buffer.write(self.packer.pack(StableDict(item)))
  99. if self.is_full():
  100. self.flush()
  101. def write_chunk(self, chunk):
  102. raise NotImplementedError
  103. def flush(self, flush=False):
  104. if self.buffer.tell() == 0:
  105. return
  106. self.buffer.seek(0)
  107. chunks = list(bytes(s) for s in self.chunker.chunkify(self.buffer))
  108. self.buffer.seek(0)
  109. self.buffer.truncate(0)
  110. # Leave the last partial chunk in the buffer unless flush is True
  111. end = None if flush or len(chunks) == 1 else -1
  112. for chunk in chunks[:end]:
  113. self.chunks.append(self.write_chunk(chunk))
  114. if end == -1:
  115. self.buffer.write(chunks[-1])
  116. def is_full(self):
  117. return self.buffer.tell() > self.BUFFER_SIZE
  118. class CacheChunkBuffer(ChunkBuffer):
  119. def __init__(self, cache, key, stats, chunker_params=ITEMS_CHUNKER_PARAMS):
  120. super().__init__(key, chunker_params)
  121. self.cache = cache
  122. self.stats = stats
  123. def write_chunk(self, chunk):
  124. id_, _, _ = self.cache.add_chunk(self.key.id_hash(chunk), chunk, self.stats)
  125. return id_
  126. class Archive:
  127. class DoesNotExist(Error):
  128. """Archive {} does not exist"""
  129. class AlreadyExists(Error):
  130. """Archive {} already exists"""
  131. class IncompatibleFilesystemEncodingError(Error):
  132. """Failed to encode filename "{}" into file system encoding "{}". Consider configuring the LANG environment variable."""
  133. def __init__(self, repository, key, manifest, name, cache=None, create=False,
  134. checkpoint_interval=300, numeric_owner=False, progress=False,
  135. chunker_params=CHUNKER_PARAMS, start=None, end=None):
  136. self.cwd = os.getcwd()
  137. self.key = key
  138. self.repository = repository
  139. self.cache = cache
  140. self.manifest = manifest
  141. self.hard_links = {}
  142. self.stats = Statistics()
  143. self.show_progress = progress
  144. self.name = name
  145. self.checkpoint_interval = checkpoint_interval
  146. self.numeric_owner = numeric_owner
  147. if start is None:
  148. start = datetime.utcnow()
  149. self.start = start
  150. if end is None:
  151. end = datetime.utcnow()
  152. self.end = end
  153. self.pipeline = DownloadPipeline(self.repository, self.key)
  154. if create:
  155. self.items_buffer = CacheChunkBuffer(self.cache, self.key, self.stats)
  156. self.chunker = Chunker(self.key.chunk_seed, *chunker_params)
  157. if name in manifest.archives:
  158. raise self.AlreadyExists(name)
  159. self.last_checkpoint = time.time()
  160. i = 0
  161. while True:
  162. self.checkpoint_name = '%s.checkpoint%s' % (name, i and ('.%d' % i) or '')
  163. if self.checkpoint_name not in manifest.archives:
  164. break
  165. i += 1
  166. else:
  167. if name not in self.manifest.archives:
  168. raise self.DoesNotExist(name)
  169. info = self.manifest.archives[name]
  170. self.load(info[b'id'])
  171. self.zeros = b'\0' * (1 << chunker_params[1])
  172. def _load_meta(self, id):
  173. data = self.key.decrypt(id, self.repository.get(id))
  174. metadata = msgpack.unpackb(data)
  175. if metadata[b'version'] != 1:
  176. raise Exception('Unknown archive metadata version')
  177. return metadata
  178. def load(self, id):
  179. self.id = id
  180. self.metadata = self._load_meta(self.id)
  181. decode_dict(self.metadata, (b'name', b'hostname', b'username', b'time', b'time_end'))
  182. self.metadata[b'cmdline'] = [arg.decode('utf-8', 'surrogateescape') for arg in self.metadata[b'cmdline']]
  183. self.name = self.metadata[b'name']
  184. @property
  185. def ts(self):
  186. """Timestamp of archive creation (start) in UTC"""
  187. ts = self.metadata[b'time']
  188. return parse_timestamp(ts)
  189. @property
  190. def ts_end(self):
  191. """Timestamp of archive creation (end) in UTC"""
  192. # fall back to time if there is no time_end present in metadata
  193. ts = self.metadata.get(b'time_end') or self.metadata[b'time']
  194. return parse_timestamp(ts)
  195. @property
  196. def fpr(self):
  197. return hexlify(self.id).decode('ascii')
  198. @property
  199. def duration(self):
  200. return format_timedelta(self.end - self.start)
  201. def __str__(self):
  202. return '''\
  203. Archive name: {0.name}
  204. Archive fingerprint: {0.fpr}
  205. Time (start): {start}
  206. Time (end): {end}
  207. Duration: {0.duration}
  208. Number of files: {0.stats.nfiles}'''.format(
  209. self,
  210. start=format_time(to_localtime(self.start.replace(tzinfo=timezone.utc))),
  211. end=format_time(to_localtime(self.end.replace(tzinfo=timezone.utc))))
  212. def __repr__(self):
  213. return 'Archive(%r)' % self.name
  214. def iter_items(self, filter=None, preload=False):
  215. for item in self.pipeline.unpack_many(self.metadata[b'items'], filter=filter, preload=preload):
  216. yield item
  217. def add_item(self, item):
  218. unknown_keys = set(item) - self.manifest.item_keys
  219. assert not unknown_keys, ('unknown item metadata keys detected, please update ITEM_KEYS: %s',
  220. ','.join(k.decode('ascii') for k in unknown_keys))
  221. if self.show_progress:
  222. self.stats.show_progress(item=item, dt=0.2)
  223. self.items_buffer.add(item)
  224. if time.time() - self.last_checkpoint > self.checkpoint_interval:
  225. self.write_checkpoint()
  226. self.last_checkpoint = time.time()
  227. def write_checkpoint(self):
  228. self.save(self.checkpoint_name)
  229. del self.manifest.archives[self.checkpoint_name]
  230. self.cache.chunk_decref(self.id, self.stats)
  231. def save(self, name=None, timestamp=None):
  232. name = name or self.name
  233. if name in self.manifest.archives:
  234. raise self.AlreadyExists(name)
  235. self.items_buffer.flush(flush=True)
  236. if timestamp is None:
  237. self.end = datetime.utcnow()
  238. start = self.start
  239. end = self.end
  240. else:
  241. self.end = timestamp
  242. start = timestamp
  243. end = timestamp # we only have 1 value
  244. metadata = StableDict({
  245. 'version': 1,
  246. 'name': name,
  247. 'items': self.items_buffer.chunks,
  248. 'cmdline': sys.argv,
  249. 'hostname': socket.gethostname(),
  250. 'username': getuser(),
  251. 'time': start.isoformat(),
  252. 'time_end': end.isoformat(),
  253. })
  254. data = msgpack.packb(metadata, unicode_errors='surrogateescape')
  255. self.id = self.key.id_hash(data)
  256. self.cache.add_chunk(self.id, data, self.stats)
  257. self.manifest.archives[name] = {'id': self.id, 'time': metadata['time']}
  258. self.manifest.write()
  259. self.repository.commit()
  260. self.cache.commit()
  261. def calc_stats(self, cache):
  262. def add(id):
  263. count, size, csize = cache.chunks[id]
  264. stats.update(size, csize, count == 1)
  265. cache.chunks[id] = count - 1, size, csize
  266. def add_file_chunks(chunks):
  267. for id, _, _ in chunks:
  268. add(id)
  269. # This function is a bit evil since it abuses the cache to calculate
  270. # the stats. The cache transaction must be rolled back afterwards
  271. unpacker = msgpack.Unpacker(use_list=False)
  272. cache.begin_txn()
  273. stats = Statistics()
  274. add(self.id)
  275. for id, chunk in zip(self.metadata[b'items'], self.repository.get_many(self.metadata[b'items'])):
  276. add(id)
  277. unpacker.feed(self.key.decrypt(id, chunk))
  278. for item in unpacker:
  279. if b'chunks' in item:
  280. stats.nfiles += 1
  281. add_file_chunks(item[b'chunks'])
  282. cache.rollback()
  283. return stats
  284. def extract_item(self, item, restore_attrs=True, dry_run=False, stdout=False, sparse=False):
  285. if dry_run or stdout:
  286. if b'chunks' in item:
  287. for data in self.pipeline.fetch_many([c[0] for c in item[b'chunks']], is_preloaded=True):
  288. if stdout:
  289. sys.stdout.buffer.write(data)
  290. if stdout:
  291. sys.stdout.buffer.flush()
  292. return
  293. dest = self.cwd
  294. if item[b'path'].startswith('/') or item[b'path'].startswith('..'):
  295. raise Exception('Path should be relative and local')
  296. path = os.path.join(dest, item[b'path'])
  297. # Attempt to remove existing files, ignore errors on failure
  298. try:
  299. st = os.lstat(path)
  300. if stat.S_ISDIR(st.st_mode):
  301. os.rmdir(path)
  302. else:
  303. os.unlink(path)
  304. except UnicodeEncodeError:
  305. raise self.IncompatibleFilesystemEncodingError(path, sys.getfilesystemencoding()) from None
  306. except OSError:
  307. pass
  308. mode = item[b'mode']
  309. if stat.S_ISREG(mode):
  310. if not os.path.exists(os.path.dirname(path)):
  311. with backup_io():
  312. os.makedirs(os.path.dirname(path))
  313. # Hard link?
  314. if b'source' in item:
  315. source = os.path.join(dest, item[b'source'])
  316. with backup_io():
  317. if os.path.exists(path):
  318. os.unlink(path)
  319. os.link(source, path)
  320. else:
  321. with backup_io():
  322. fd = open(path, 'wb')
  323. with fd:
  324. ids = [c[0] for c in item[b'chunks']]
  325. for data in self.pipeline.fetch_many(ids, is_preloaded=True):
  326. with backup_io():
  327. if sparse and self.zeros.startswith(data):
  328. # all-zero chunk: create a hole in a sparse file
  329. fd.seek(len(data), 1)
  330. else:
  331. fd.write(data)
  332. with backup_io():
  333. pos = fd.tell()
  334. fd.truncate(pos)
  335. fd.flush()
  336. self.restore_attrs(path, item, fd=fd.fileno())
  337. return
  338. with backup_io():
  339. # No repository access beyond this point.
  340. if stat.S_ISDIR(mode):
  341. if not os.path.exists(path):
  342. os.makedirs(path)
  343. if restore_attrs:
  344. self.restore_attrs(path, item)
  345. elif stat.S_ISLNK(mode):
  346. if not os.path.exists(os.path.dirname(path)):
  347. os.makedirs(os.path.dirname(path))
  348. source = item[b'source']
  349. if os.path.exists(path):
  350. os.unlink(path)
  351. try:
  352. os.symlink(source, path)
  353. except UnicodeEncodeError:
  354. raise self.IncompatibleFilesystemEncodingError(source, sys.getfilesystemencoding()) from None
  355. self.restore_attrs(path, item, symlink=True)
  356. elif stat.S_ISFIFO(mode):
  357. if not os.path.exists(os.path.dirname(path)):
  358. os.makedirs(os.path.dirname(path))
  359. os.mkfifo(path)
  360. self.restore_attrs(path, item)
  361. elif stat.S_ISCHR(mode) or stat.S_ISBLK(mode):
  362. os.mknod(path, item[b'mode'], item[b'rdev'])
  363. self.restore_attrs(path, item)
  364. else:
  365. raise Exception('Unknown archive item type %r' % item[b'mode'])
  366. def restore_attrs(self, path, item, symlink=False, fd=None):
  367. """
  368. Restore filesystem attributes on *path* (*fd*) from *item*.
  369. Does not access the repository.
  370. """
  371. uid = gid = None
  372. if not self.numeric_owner:
  373. uid = user2uid(item[b'user'])
  374. gid = group2gid(item[b'group'])
  375. uid = item[b'uid'] if uid is None else uid
  376. gid = item[b'gid'] if gid is None else gid
  377. # This code is a bit of a mess due to os specific differences
  378. try:
  379. if fd:
  380. os.fchown(fd, uid, gid)
  381. else:
  382. os.lchown(path, uid, gid)
  383. except OSError:
  384. pass
  385. if fd:
  386. os.fchmod(fd, item[b'mode'])
  387. elif not symlink:
  388. os.chmod(path, item[b'mode'])
  389. elif has_lchmod: # Not available on Linux
  390. os.lchmod(path, item[b'mode'])
  391. mtime = bigint_to_int(item[b'mtime'])
  392. if b'atime' in item:
  393. atime = bigint_to_int(item[b'atime'])
  394. else:
  395. # old archives only had mtime in item metadata
  396. atime = mtime
  397. if fd:
  398. os.utime(fd, None, ns=(atime, mtime))
  399. else:
  400. os.utime(path, None, ns=(atime, mtime), follow_symlinks=False)
  401. acl_set(path, item, self.numeric_owner)
  402. # Only available on OS X and FreeBSD
  403. if has_lchflags and b'bsdflags' in item:
  404. try:
  405. os.lchflags(path, item[b'bsdflags'])
  406. except OSError:
  407. pass
  408. # chown removes Linux capabilities, so set the extended attributes at the end, after chown, since they include
  409. # the Linux capabilities in the "security.capability" attribute.
  410. xattrs = item.get(b'xattrs', {})
  411. for k, v in xattrs.items():
  412. try:
  413. xattr.setxattr(fd or path, k, v, follow_symlinks=False)
  414. except OSError as e:
  415. if e.errno not in (errno.ENOTSUP, errno.EACCES):
  416. # only raise if the errno is not on our ignore list:
  417. # ENOTSUP == xattrs not supported here
  418. # EACCES == permission denied to set this specific xattr
  419. # (this may happen related to security.* keys)
  420. raise
  421. def rename(self, name):
  422. if name in self.manifest.archives:
  423. raise self.AlreadyExists(name)
  424. metadata = StableDict(self._load_meta(self.id))
  425. metadata[b'name'] = name
  426. data = msgpack.packb(metadata, unicode_errors='surrogateescape')
  427. new_id = self.key.id_hash(data)
  428. self.cache.add_chunk(new_id, data, self.stats)
  429. self.manifest.archives[name] = {'id': new_id, 'time': metadata[b'time']}
  430. self.cache.chunk_decref(self.id, self.stats)
  431. del self.manifest.archives[self.name]
  432. def delete(self, stats, progress=False):
  433. unpacker = msgpack.Unpacker(use_list=False)
  434. items_ids = self.metadata[b'items']
  435. pi = ProgressIndicatorPercent(total=len(items_ids), msg="Decrementing references %3.0f%%", same_line=True)
  436. for (i, (items_id, data)) in enumerate(zip(items_ids, self.repository.get_many(items_ids))):
  437. if progress:
  438. pi.show(i)
  439. unpacker.feed(self.key.decrypt(items_id, data))
  440. self.cache.chunk_decref(items_id, stats)
  441. for item in unpacker:
  442. if b'chunks' in item:
  443. for chunk_id, size, csize in item[b'chunks']:
  444. self.cache.chunk_decref(chunk_id, stats)
  445. if progress:
  446. pi.finish()
  447. self.cache.chunk_decref(self.id, stats)
  448. del self.manifest.archives[self.name]
  449. def stat_attrs(self, st, path):
  450. item = {
  451. b'mode': st.st_mode,
  452. b'uid': st.st_uid, b'user': uid2user(st.st_uid),
  453. b'gid': st.st_gid, b'group': gid2group(st.st_gid),
  454. b'atime': int_to_bigint(st.st_atime_ns),
  455. b'ctime': int_to_bigint(st.st_ctime_ns),
  456. b'mtime': int_to_bigint(st.st_mtime_ns),
  457. }
  458. if self.numeric_owner:
  459. item[b'user'] = item[b'group'] = None
  460. with backup_io():
  461. xattrs = xattr.get_all(path, follow_symlinks=False)
  462. if xattrs:
  463. item[b'xattrs'] = StableDict(xattrs)
  464. if has_lchflags and st.st_flags:
  465. item[b'bsdflags'] = st.st_flags
  466. with backup_io():
  467. acl_get(path, item, st, self.numeric_owner)
  468. return item
  469. def process_dir(self, path, st):
  470. item = {b'path': make_path_safe(path)}
  471. item.update(self.stat_attrs(st, path))
  472. self.add_item(item)
  473. return 'd' # directory
  474. def process_fifo(self, path, st):
  475. item = {b'path': make_path_safe(path)}
  476. item.update(self.stat_attrs(st, path))
  477. self.add_item(item)
  478. return 'f' # fifo
  479. def process_dev(self, path, st):
  480. item = {b'path': make_path_safe(path), b'rdev': st.st_rdev}
  481. item.update(self.stat_attrs(st, path))
  482. self.add_item(item)
  483. if stat.S_ISCHR(st.st_mode):
  484. return 'c' # char device
  485. elif stat.S_ISBLK(st.st_mode):
  486. return 'b' # block device
  487. def process_symlink(self, path, st):
  488. source = os.readlink(path)
  489. item = {b'path': make_path_safe(path), b'source': source}
  490. item.update(self.stat_attrs(st, path))
  491. self.add_item(item)
  492. return 's' # symlink
  493. def process_stdin(self, path, cache):
  494. uid, gid = 0, 0
  495. fd = sys.stdin.buffer # binary
  496. chunks = []
  497. for chunk in backup_io_iter(self.chunker.chunkify(fd)):
  498. chunks.append(cache.add_chunk(self.key.id_hash(chunk), chunk, self.stats))
  499. self.stats.nfiles += 1
  500. t = int_to_bigint(int(time.time()) * 1000000000)
  501. item = {
  502. b'path': path,
  503. b'chunks': chunks,
  504. b'mode': 0o100660, # regular file, ug=rw
  505. b'uid': uid, b'user': uid2user(uid),
  506. b'gid': gid, b'group': gid2group(gid),
  507. b'mtime': t, b'atime': t, b'ctime': t,
  508. }
  509. self.add_item(item)
  510. return 'i' # stdin
  511. def process_file(self, path, st, cache, ignore_inode=False):
  512. status = None
  513. safe_path = make_path_safe(path)
  514. # Is it a hard link?
  515. if st.st_nlink > 1:
  516. source = self.hard_links.get((st.st_ino, st.st_dev))
  517. if (st.st_ino, st.st_dev) in self.hard_links:
  518. item = self.stat_attrs(st, path)
  519. item.update({b'path': safe_path, b'source': source})
  520. self.add_item(item)
  521. status = 'h' # regular file, hardlink (to already seen inodes)
  522. return status
  523. else:
  524. self.hard_links[st.st_ino, st.st_dev] = safe_path
  525. is_regular_file = stat.S_ISREG(st.st_mode)
  526. if is_regular_file:
  527. path_hash = self.key.id_hash(os.path.join(self.cwd, path).encode('utf-8', 'surrogateescape'))
  528. ids = cache.file_known_and_unchanged(path_hash, st, ignore_inode)
  529. else:
  530. # in --read-special mode, we may be called for special files.
  531. # there should be no information in the cache about special files processed in
  532. # read-special mode, but we better play safe as this was wrong in the past:
  533. path_hash = ids = None
  534. first_run = not cache.files
  535. if first_run:
  536. logger.debug('Processing files ...')
  537. chunks = None
  538. if ids is not None:
  539. # Make sure all ids are available
  540. for id_ in ids:
  541. if not cache.seen_chunk(id_):
  542. break
  543. else:
  544. chunks = [cache.chunk_incref(id_, self.stats) for id_ in ids]
  545. status = 'U' # regular file, unchanged
  546. else:
  547. status = 'A' # regular file, added
  548. item = {b'path': safe_path}
  549. # Only chunkify the file if needed
  550. if chunks is None:
  551. with backup_io():
  552. fh = Archive._open_rb(path)
  553. with os.fdopen(fh, 'rb') as fd:
  554. chunks = []
  555. for chunk in backup_io_iter(self.chunker.chunkify(fd, fh)):
  556. chunks.append(cache.add_chunk(self.key.id_hash(chunk), chunk, self.stats))
  557. if self.show_progress:
  558. self.stats.show_progress(item=item, dt=0.2)
  559. if is_regular_file:
  560. # we must not memorize special files, because the contents of e.g. a
  561. # block or char device will change without its mtime/size/inode changing.
  562. cache.memorize_file(path_hash, st, [c[0] for c in chunks])
  563. status = status or 'M' # regular file, modified (if not 'A' already)
  564. item[b'chunks'] = chunks
  565. item.update(self.stat_attrs(st, path))
  566. if not is_regular_file:
  567. # we processed a special file like a regular file. reflect that in mode,
  568. # so it can be extracted / accessed in fuse mount like a regular file:
  569. item[b'mode'] = stat.S_IFREG | stat.S_IMODE(item[b'mode'])
  570. self.stats.nfiles += 1
  571. self.add_item(item)
  572. return status
  573. @staticmethod
  574. def list_archives(repository, key, manifest, cache=None):
  575. # expensive! see also Manifest.list_archive_infos.
  576. for name, info in manifest.archives.items():
  577. yield Archive(repository, key, manifest, name, cache=cache)
  578. @staticmethod
  579. def _open_rb(path):
  580. try:
  581. # if we have O_NOATIME, this likely will succeed if we are root or owner of file:
  582. return os.open(path, flags_noatime)
  583. except PermissionError:
  584. if flags_noatime == flags_normal:
  585. # we do not have O_NOATIME, no need to try again:
  586. raise
  587. # Was this EPERM due to the O_NOATIME flag? Try again without it:
  588. return os.open(path, flags_normal)
  589. # this set must be kept complete, otherwise the RobustUnpacker might malfunction:
  590. ITEM_KEYS = frozenset([b'path', b'source', b'rdev', b'chunks',
  591. b'mode', b'user', b'group', b'uid', b'gid', b'mtime', b'atime', b'ctime',
  592. b'xattrs', b'bsdflags', b'acl_nfs4', b'acl_access', b'acl_default', b'acl_extended', ])
  593. # this is the set of keys that are always present in items:
  594. REQUIRED_ITEM_KEYS = frozenset([b'path', b'mtime', ])
  595. # this set must be kept complete, otherwise rebuild_manifest might malfunction:
  596. ARCHIVE_KEYS = frozenset([b'version', b'name', b'items', b'cmdline', b'hostname', b'username', b'time', b'time_end', ])
  597. # this is the set of keys that are always present in archives:
  598. REQUIRED_ARCHIVE_KEYS = frozenset([b'version', b'name', b'items', b'cmdline', b'time', ])
  599. def valid_msgpacked_dict(d, keys_serialized):
  600. """check if the data <d> looks like a msgpacked dict"""
  601. d_len = len(d)
  602. if d_len == 0:
  603. return False
  604. if d[0] & 0xf0 == 0x80: # object is a fixmap (up to 15 elements)
  605. offs = 1
  606. elif d[0] == 0xde: # object is a map16 (up to 2^16-1 elements)
  607. offs = 3
  608. else:
  609. # object is not a map (dict)
  610. # note: we must not have dicts with > 2^16-1 elements
  611. return False
  612. if d_len <= offs:
  613. return False
  614. # is the first dict key a bytestring?
  615. if d[offs] & 0xe0 == 0xa0: # key is a small bytestring (up to 31 chars)
  616. pass
  617. elif d[offs] in (0xd9, 0xda, 0xdb): # key is a str8, str16 or str32
  618. pass
  619. else:
  620. # key is not a bytestring
  621. return False
  622. # is the bytestring any of the expected key names?
  623. key_serialized = d[offs:]
  624. return any(key_serialized.startswith(pattern) for pattern in keys_serialized)
  625. class RobustUnpacker:
  626. """A restartable/robust version of the streaming msgpack unpacker
  627. """
  628. def __init__(self, validator, item_keys):
  629. super().__init__()
  630. self.item_keys = [msgpack.packb(name) for name in item_keys]
  631. self.validator = validator
  632. self._buffered_data = []
  633. self._resync = False
  634. self._unpacker = msgpack.Unpacker(object_hook=StableDict)
  635. def resync(self):
  636. self._buffered_data = []
  637. self._resync = True
  638. def feed(self, data):
  639. if self._resync:
  640. self._buffered_data.append(data)
  641. else:
  642. self._unpacker.feed(data)
  643. def __iter__(self):
  644. return self
  645. def __next__(self):
  646. if self._resync:
  647. data = b''.join(self._buffered_data)
  648. while self._resync:
  649. if not data:
  650. raise StopIteration
  651. # Abort early if the data does not look like a serialized item dict
  652. if not valid_msgpacked_dict(data, self.item_keys):
  653. data = data[1:]
  654. continue
  655. self._unpacker = msgpack.Unpacker(object_hook=StableDict)
  656. self._unpacker.feed(data)
  657. try:
  658. item = next(self._unpacker)
  659. if self.validator(item):
  660. self._resync = False
  661. return item
  662. # Ignore exceptions that might be raised when feeding
  663. # msgpack with invalid data
  664. except (TypeError, ValueError, StopIteration):
  665. pass
  666. data = data[1:]
  667. else:
  668. return next(self._unpacker)
  669. class ArchiveChecker:
  670. def __init__(self):
  671. self.error_found = False
  672. self.possibly_superseded = set()
  673. def check(self, repository, repair=False, archive=None, last=None, prefix=None, save_space=False):
  674. logger.info('Starting archive consistency check...')
  675. self.check_all = archive is None and last is None and prefix is None
  676. self.repair = repair
  677. self.repository = repository
  678. self.init_chunks()
  679. self.key = self.identify_key(repository)
  680. if Manifest.MANIFEST_ID not in self.chunks:
  681. logger.error("Repository manifest not found!")
  682. self.error_found = True
  683. self.manifest = self.rebuild_manifest()
  684. else:
  685. self.manifest, _ = Manifest.load(repository, key=self.key)
  686. self.rebuild_refcounts(archive=archive, last=last, prefix=prefix)
  687. self.orphan_chunks_check()
  688. self.finish(save_space=save_space)
  689. if self.error_found:
  690. logger.error('Archive consistency check complete, problems found.')
  691. else:
  692. logger.info('Archive consistency check complete, no problems found.')
  693. return self.repair or not self.error_found
  694. def init_chunks(self):
  695. """Fetch a list of all object keys from repository
  696. """
  697. # Explicitly set the initial hash table capacity to avoid performance issues
  698. # due to hash table "resonance"
  699. capacity = int(len(self.repository) * 1.2)
  700. self.chunks = ChunkIndex(capacity)
  701. marker = None
  702. while True:
  703. result = self.repository.list(limit=10000, marker=marker)
  704. if not result:
  705. break
  706. marker = result[-1]
  707. for id_ in result:
  708. self.chunks[id_] = (0, 0, 0)
  709. def identify_key(self, repository):
  710. try:
  711. some_chunkid, _ = next(self.chunks.iteritems())
  712. except StopIteration:
  713. # repo is completely empty, no chunks
  714. return None
  715. cdata = repository.get(some_chunkid)
  716. return key_factory(repository, cdata)
  717. def rebuild_manifest(self):
  718. """Rebuild the manifest object if it is missing
  719. Iterates through all objects in the repository looking for archive metadata blocks.
  720. """
  721. def valid_archive(obj):
  722. if not isinstance(obj, dict):
  723. return False
  724. keys = set(obj)
  725. return REQUIRED_ARCHIVE_KEYS.issubset(keys)
  726. logger.info('Rebuilding missing manifest, this might take some time...')
  727. # as we have lost the manifest, we do not know any more what valid item keys we had.
  728. # collecting any key we encounter in a damaged repo seems unwise, thus we just use
  729. # the hardcoded list from the source code. thus, it is not recommended to rebuild a
  730. # lost manifest on a older borg version than the most recent one that was ever used
  731. # within this repository (assuming that newer borg versions support more item keys).
  732. manifest = Manifest(self.key, self.repository)
  733. archive_keys_serialized = [msgpack.packb(name) for name in ARCHIVE_KEYS]
  734. for chunk_id, _ in self.chunks.iteritems():
  735. cdata = self.repository.get(chunk_id)
  736. data = self.key.decrypt(chunk_id, cdata)
  737. if not valid_msgpacked_dict(data, archive_keys_serialized):
  738. continue
  739. if b'cmdline' not in data or b'\xa7version\x01' not in data:
  740. continue
  741. try:
  742. archive = msgpack.unpackb(data)
  743. # Ignore exceptions that might be raised when feeding
  744. # msgpack with invalid data
  745. except (TypeError, ValueError, StopIteration):
  746. continue
  747. if valid_archive(archive):
  748. logger.info('Found archive %s', archive[b'name'].decode('utf-8'))
  749. manifest.archives[archive[b'name'].decode('utf-8')] = {b'id': chunk_id, b'time': archive[b'time']}
  750. logger.info('Manifest rebuild complete.')
  751. return manifest
  752. def rebuild_refcounts(self, archive=None, last=None, prefix=None):
  753. """Rebuild object reference counts by walking the metadata
  754. Missing and/or incorrect data is repaired when detected
  755. """
  756. # Exclude the manifest from chunks
  757. del self.chunks[Manifest.MANIFEST_ID]
  758. def mark_as_possibly_superseded(id_):
  759. if self.chunks.get(id_, (0,))[0] == 0:
  760. self.possibly_superseded.add(id_)
  761. def add_callback(chunk):
  762. id_ = self.key.id_hash(chunk)
  763. cdata = self.key.encrypt(chunk)
  764. add_reference(id_, len(chunk), len(cdata), cdata)
  765. return id_
  766. def add_reference(id_, size, csize, cdata=None):
  767. try:
  768. self.chunks.incref(id_)
  769. except KeyError:
  770. assert cdata is not None
  771. self.chunks[id_] = 1, size, csize
  772. if self.repair:
  773. self.repository.put(id_, cdata)
  774. def verify_file_chunks(item):
  775. """Verifies that all file chunks are present
  776. Missing file chunks will be replaced with new chunks of the same
  777. length containing all zeros.
  778. """
  779. offset = 0
  780. chunk_list = []
  781. for chunk_id, size, csize in item[b'chunks']:
  782. if chunk_id not in self.chunks:
  783. # If a file chunk is missing, create an all empty replacement chunk
  784. logger.error('{}: Missing file chunk detected (Byte {}-{})'.format(item[b'path'].decode('utf-8', 'surrogateescape'), offset, offset + size))
  785. self.error_found = True
  786. data = bytes(size)
  787. chunk_id = self.key.id_hash(data)
  788. cdata = self.key.encrypt(data)
  789. csize = len(cdata)
  790. add_reference(chunk_id, size, csize, cdata)
  791. else:
  792. add_reference(chunk_id, size, csize)
  793. chunk_list.append((chunk_id, size, csize))
  794. offset += size
  795. item[b'chunks'] = chunk_list
  796. def robust_iterator(archive):
  797. """Iterates through all archive items
  798. Missing item chunks will be skipped and the msgpack stream will be restarted
  799. """
  800. item_keys = self.manifest.item_keys
  801. unpacker = RobustUnpacker(lambda item: isinstance(item, dict) and b'path' in item, item_keys)
  802. _state = 0
  803. def missing_chunk_detector(chunk_id):
  804. nonlocal _state
  805. if _state % 2 != int(chunk_id not in self.chunks):
  806. _state += 1
  807. return _state
  808. def report(msg, chunk_id, chunk_no):
  809. cid = hexlify(chunk_id).decode('ascii')
  810. msg += ' [chunk: %06d_%s]' % (chunk_no, cid) # see debug-dump-archive-items
  811. self.error_found = True
  812. logger.error(msg)
  813. def valid_item(obj):
  814. if not isinstance(obj, StableDict):
  815. return False
  816. keys = set(obj)
  817. return REQUIRED_ITEM_KEYS.issubset(keys) and keys.issubset(item_keys)
  818. i = 0
  819. for state, items in groupby(archive[b'items'], missing_chunk_detector):
  820. items = list(items)
  821. if state % 2:
  822. for chunk_id in items:
  823. report('item metadata chunk missing', chunk_id, i)
  824. i += 1
  825. continue
  826. if state > 0:
  827. unpacker.resync()
  828. for chunk_id, cdata in zip(items, repository.get_many(items)):
  829. unpacker.feed(self.key.decrypt(chunk_id, cdata))
  830. try:
  831. for item in unpacker:
  832. if valid_item(item):
  833. yield item
  834. else:
  835. report('Did not get expected metadata dict when unpacking item metadata', chunk_id, i)
  836. except Exception:
  837. report('Exception while unpacking item metadata', chunk_id, i)
  838. raise
  839. i += 1
  840. if archive is None:
  841. # we need last N or all archives
  842. archive_items = sorted(self.manifest.archives.items(), reverse=True,
  843. key=lambda name_info: name_info[1][b'time'])
  844. if prefix is not None:
  845. archive_items = [item for item in archive_items if item[0].startswith(prefix)]
  846. num_archives = len(archive_items)
  847. end = None if last is None else min(num_archives, last)
  848. else:
  849. # we only want one specific archive
  850. archive_items = [item for item in self.manifest.archives.items() if item[0] == archive]
  851. num_archives = 1
  852. end = 1
  853. with cache_if_remote(self.repository) as repository:
  854. for i, (name, info) in enumerate(archive_items[:end]):
  855. logger.info('Analyzing archive {} ({}/{})'.format(name, num_archives - i, num_archives))
  856. archive_id = info[b'id']
  857. if archive_id not in self.chunks:
  858. logger.error('Archive metadata block is missing!')
  859. self.error_found = True
  860. del self.manifest.archives[name]
  861. continue
  862. mark_as_possibly_superseded(archive_id)
  863. cdata = self.repository.get(archive_id)
  864. data = self.key.decrypt(archive_id, cdata)
  865. archive = StableDict(msgpack.unpackb(data))
  866. if archive[b'version'] != 1:
  867. raise Exception('Unknown archive metadata version')
  868. decode_dict(archive, (b'name', b'hostname', b'username', b'time', b'time_end'))
  869. archive[b'cmdline'] = [arg.decode('utf-8', 'surrogateescape') for arg in archive[b'cmdline']]
  870. items_buffer = ChunkBuffer(self.key)
  871. items_buffer.write_chunk = add_callback
  872. for item in robust_iterator(archive):
  873. if b'chunks' in item:
  874. verify_file_chunks(item)
  875. items_buffer.add(item)
  876. items_buffer.flush(flush=True)
  877. for previous_item_id in archive[b'items']:
  878. mark_as_possibly_superseded(previous_item_id)
  879. archive[b'items'] = items_buffer.chunks
  880. data = msgpack.packb(archive, unicode_errors='surrogateescape')
  881. new_archive_id = self.key.id_hash(data)
  882. cdata = self.key.encrypt(data)
  883. add_reference(new_archive_id, len(data), len(cdata), cdata)
  884. info[b'id'] = new_archive_id
  885. def orphan_chunks_check(self):
  886. if self.check_all:
  887. unused = set()
  888. for id_, (count, size, csize) in self.chunks.iteritems():
  889. if count == 0:
  890. unused.add(id_)
  891. orphaned = unused - self.possibly_superseded
  892. if orphaned:
  893. logger.error('{} orphaned objects found!'.format(len(orphaned)))
  894. self.error_found = True
  895. if self.repair:
  896. for id_ in unused:
  897. self.repository.delete(id_)
  898. else:
  899. logger.info('Orphaned objects check skipped (needs all archives checked).')
  900. def finish(self, save_space=False):
  901. if self.repair:
  902. self.manifest.write()
  903. self.repository.commit(save_space=save_space)