archive.py 50 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204
  1. from contextlib import contextmanager
  2. from datetime import datetime, timezone, timedelta
  3. from getpass import getuser
  4. from itertools import groupby
  5. import errno
  6. from .logger import create_logger
  7. logger = create_logger()
  8. from .key import key_factory
  9. from .remote import cache_if_remote
  10. import os
  11. import socket
  12. import stat
  13. import sys
  14. import time
  15. from io import BytesIO
  16. from . import xattr
  17. from .helpers import Error, uid2user, user2uid, gid2group, group2gid, bin_to_hex, \
  18. parse_timestamp, to_localtime, ISO_FORMAT, format_time, format_timedelta, remove_surrogates, \
  19. Manifest, Statistics, decode_dict, make_path_safe, StableDict, int_to_bigint, bigint_to_int, \
  20. ProgressIndicatorPercent, IntegrityError, set_ec, EXIT_WARNING, safe_ns
  21. from .platform import acl_get, acl_set
  22. from .chunker import Chunker
  23. from .hashindex import ChunkIndex
  24. from .repository import Repository, LIST_SCAN_LIMIT
  25. import msgpack
  26. ITEMS_BUFFER = 1024 * 1024
  27. CHUNK_MIN_EXP = 19 # 2**19 == 512kiB
  28. CHUNK_MAX_EXP = 23 # 2**23 == 8MiB
  29. HASH_WINDOW_SIZE = 0xfff # 4095B
  30. HASH_MASK_BITS = 21 # results in ~2MiB chunks statistically
  31. # defaults, use --chunker-params to override
  32. CHUNKER_PARAMS = (CHUNK_MIN_EXP, CHUNK_MAX_EXP, HASH_MASK_BITS, HASH_WINDOW_SIZE)
  33. # chunker params for the items metadata stream, finer granularity
  34. ITEMS_CHUNKER_PARAMS = (15, 19, 17, HASH_WINDOW_SIZE)
  35. has_lchmod = hasattr(os, 'lchmod')
  36. has_lchflags = hasattr(os, 'lchflags')
  37. flags_normal = os.O_RDONLY | getattr(os, 'O_BINARY', 0)
  38. flags_noatime = flags_normal | getattr(os, 'O_NOATIME', 0)
  39. def is_special(mode):
  40. # file types that get special treatment in --read-special mode
  41. return stat.S_ISBLK(mode) or stat.S_ISCHR(mode) or stat.S_ISFIFO(mode)
  42. class BackupOSError(Exception):
  43. """
  44. Wrapper for OSError raised while accessing backup files.
  45. Borg does different kinds of IO, and IO failures have different consequences.
  46. This wrapper represents failures of input file or extraction IO.
  47. These are non-critical and are only reported (exit code = 1, warning).
  48. Any unwrapped IO error is critical and aborts execution (for example repository IO failure).
  49. """
  50. def __init__(self, os_error):
  51. self.os_error = os_error
  52. self.errno = os_error.errno
  53. self.strerror = os_error.strerror
  54. self.filename = os_error.filename
  55. def __str__(self):
  56. return str(self.os_error)
  57. @contextmanager
  58. def backup_io():
  59. """Context manager changing OSError to BackupOSError."""
  60. try:
  61. yield
  62. except OSError as os_error:
  63. raise BackupOSError(os_error) from os_error
  64. def backup_io_iter(iterator):
  65. while True:
  66. try:
  67. with backup_io():
  68. item = next(iterator)
  69. except StopIteration:
  70. return
  71. yield item
  72. class DownloadPipeline:
  73. def __init__(self, repository, key):
  74. self.repository = repository
  75. self.key = key
  76. def unpack_many(self, ids, filter=None, preload=False):
  77. """
  78. Return iterator of items.
  79. *ids* is a chunk ID list of an item stream. *filter* is a callable
  80. to decide whether an item will be yielded. *preload* preloads the data chunks of every yielded item.
  81. Warning: if *preload* is True then all data chunks of every yielded item have to be retrieved,
  82. otherwise preloaded chunks will accumulate in RemoteRepository and create a memory leak.
  83. """
  84. unpacker = msgpack.Unpacker(use_list=False)
  85. for data in self.fetch_many(ids):
  86. unpacker.feed(data)
  87. items = [decode_dict(item, (b'path', b'source', b'user', b'group')) for item in unpacker]
  88. if filter:
  89. items = [item for item in items if filter(item)]
  90. if preload:
  91. for item in items:
  92. if b'chunks' in item:
  93. self.repository.preload([c[0] for c in item[b'chunks']])
  94. for item in items:
  95. yield item
  96. def fetch_many(self, ids, is_preloaded=False):
  97. for id_, data in zip(ids, self.repository.get_many(ids, is_preloaded=is_preloaded)):
  98. yield self.key.decrypt(id_, data)
  99. class ChunkBuffer:
  100. BUFFER_SIZE = 1 * 1024 * 1024
  101. def __init__(self, key, chunker_params=ITEMS_CHUNKER_PARAMS):
  102. self.buffer = BytesIO()
  103. self.packer = msgpack.Packer(unicode_errors='surrogateescape')
  104. self.chunks = []
  105. self.key = key
  106. self.chunker = Chunker(self.key.chunk_seed, *chunker_params)
  107. def add(self, item):
  108. self.buffer.write(self.packer.pack(StableDict(item)))
  109. if self.is_full():
  110. self.flush()
  111. def write_chunk(self, chunk):
  112. raise NotImplementedError
  113. def flush(self, flush=False):
  114. if self.buffer.tell() == 0:
  115. return
  116. self.buffer.seek(0)
  117. chunks = list(bytes(s) for s in self.chunker.chunkify(self.buffer))
  118. self.buffer.seek(0)
  119. self.buffer.truncate(0)
  120. # Leave the last partial chunk in the buffer unless flush is True
  121. end = None if flush or len(chunks) == 1 else -1
  122. for chunk in chunks[:end]:
  123. self.chunks.append(self.write_chunk(chunk))
  124. if end == -1:
  125. self.buffer.write(chunks[-1])
  126. def is_full(self):
  127. return self.buffer.tell() > self.BUFFER_SIZE
  128. class CacheChunkBuffer(ChunkBuffer):
  129. def __init__(self, cache, key, stats, chunker_params=ITEMS_CHUNKER_PARAMS):
  130. super().__init__(key, chunker_params)
  131. self.cache = cache
  132. self.stats = stats
  133. def write_chunk(self, chunk):
  134. id_, _, _ = self.cache.add_chunk(self.key.id_hash(chunk), chunk, self.stats)
  135. return id_
  136. class Archive:
  137. class DoesNotExist(Error):
  138. """Archive {} does not exist"""
  139. class AlreadyExists(Error):
  140. """Archive {} already exists"""
  141. class IncompatibleFilesystemEncodingError(Error):
  142. """Failed to encode filename "{}" into file system encoding "{}". Consider configuring the LANG environment variable."""
  143. def __init__(self, repository, key, manifest, name, cache=None, create=False,
  144. checkpoint_interval=1800, numeric_owner=False, noatime=False, noctime=False, progress=False,
  145. chunker_params=CHUNKER_PARAMS, start=None, start_monotonic=None, end=None):
  146. self.cwd = os.getcwd()
  147. self.key = key
  148. self.repository = repository
  149. self.cache = cache
  150. self.manifest = manifest
  151. self.hard_links = {}
  152. self.stats = Statistics()
  153. self.show_progress = progress
  154. self.name = name
  155. self.checkpoint_interval = checkpoint_interval
  156. self.numeric_owner = numeric_owner
  157. self.noatime = noatime
  158. self.noctime = noctime
  159. assert (start is None) == (start_monotonic is None), 'Logic error: if start is given, start_monotonic must be given as well and vice versa.'
  160. if start is None:
  161. start = datetime.utcnow()
  162. start_monotonic = time.monotonic()
  163. self.chunker_params = chunker_params
  164. self.start = start
  165. self.start_monotonic = start_monotonic
  166. if end is None:
  167. end = datetime.utcnow()
  168. self.end = end
  169. self.pipeline = DownloadPipeline(self.repository, self.key)
  170. if create:
  171. self.items_buffer = CacheChunkBuffer(self.cache, self.key, self.stats)
  172. self.chunker = Chunker(self.key.chunk_seed, *chunker_params)
  173. if name in manifest.archives:
  174. raise self.AlreadyExists(name)
  175. self.last_checkpoint = time.monotonic()
  176. i = 0
  177. while True:
  178. self.checkpoint_name = '%s.checkpoint%s' % (name, i and ('.%d' % i) or '')
  179. if self.checkpoint_name not in manifest.archives:
  180. break
  181. i += 1
  182. else:
  183. if name not in self.manifest.archives:
  184. raise self.DoesNotExist(name)
  185. info = self.manifest.archives[name]
  186. self.load(info[b'id'])
  187. self.zeros = None
  188. def _load_meta(self, id):
  189. data = self.key.decrypt(id, self.repository.get(id))
  190. metadata = msgpack.unpackb(data, unicode_errors='surrogateescape')
  191. if metadata[b'version'] != 1:
  192. raise Exception('Unknown archive metadata version')
  193. return metadata
  194. def load(self, id):
  195. self.id = id
  196. self.metadata = self._load_meta(self.id)
  197. decode_dict(self.metadata, (b'name', b'hostname', b'username', b'time', b'time_end'))
  198. self.metadata[b'cmdline'] = [arg.decode('utf-8', 'surrogateescape') for arg in self.metadata[b'cmdline']]
  199. self.name = self.metadata[b'name']
  200. @property
  201. def ts(self):
  202. """Timestamp of archive creation (start) in UTC"""
  203. ts = self.metadata[b'time']
  204. return parse_timestamp(ts)
  205. @property
  206. def ts_end(self):
  207. """Timestamp of archive creation (end) in UTC"""
  208. # fall back to time if there is no time_end present in metadata
  209. ts = self.metadata.get(b'time_end') or self.metadata[b'time']
  210. return parse_timestamp(ts)
  211. @property
  212. def fpr(self):
  213. return bin_to_hex(self.id)
  214. @property
  215. def duration(self):
  216. return format_timedelta(self.end - self.start)
  217. def __str__(self):
  218. return '''\
  219. Archive name: {0.name}
  220. Archive fingerprint: {0.fpr}
  221. Time (start): {start}
  222. Time (end): {end}
  223. Duration: {0.duration}
  224. Number of files: {0.stats.nfiles}'''.format(
  225. self,
  226. start=format_time(to_localtime(self.start.replace(tzinfo=timezone.utc))),
  227. end=format_time(to_localtime(self.end.replace(tzinfo=timezone.utc))))
  228. def __repr__(self):
  229. return 'Archive(%r)' % self.name
  230. def iter_items(self, filter=None, preload=False):
  231. for item in self.pipeline.unpack_many(self.metadata[b'items'], filter=filter, preload=preload):
  232. yield item
  233. def add_item(self, item):
  234. unknown_keys = set(item) - self.manifest.item_keys
  235. assert not unknown_keys, ('unknown item metadata keys detected, please update ITEM_KEYS: %s',
  236. ','.join(k.decode('ascii') for k in unknown_keys))
  237. if self.show_progress:
  238. self.stats.show_progress(item=item, dt=0.2)
  239. self.items_buffer.add(item)
  240. if time.monotonic() - self.last_checkpoint > self.checkpoint_interval:
  241. self.write_checkpoint()
  242. self.last_checkpoint = time.monotonic()
  243. def write_checkpoint(self):
  244. self.save(self.checkpoint_name)
  245. del self.manifest.archives[self.checkpoint_name]
  246. self.cache.chunk_decref(self.id, self.stats)
  247. def save(self, name=None, timestamp=None):
  248. name = name or self.name
  249. if name in self.manifest.archives:
  250. raise self.AlreadyExists(name)
  251. self.items_buffer.flush(flush=True)
  252. duration = timedelta(seconds=time.monotonic() - self.start_monotonic)
  253. if timestamp is None:
  254. end = datetime.utcnow()
  255. start = end - duration
  256. else:
  257. end = timestamp + duration
  258. start = timestamp
  259. self.start = start
  260. self.end = end
  261. metadata = StableDict({
  262. 'version': 1,
  263. 'name': name,
  264. 'items': self.items_buffer.chunks,
  265. 'cmdline': sys.argv,
  266. 'hostname': socket.gethostname(),
  267. 'username': getuser(),
  268. 'time': start.strftime(ISO_FORMAT),
  269. 'time_end': end.strftime(ISO_FORMAT),
  270. })
  271. data = self.key.pack_and_authenticate_metadata(metadata, context=b'archive')
  272. self.id = self.key.id_hash(data)
  273. self.cache.add_chunk(self.id, data, self.stats)
  274. self.manifest.archives[name] = {'id': self.id, 'time': metadata['time']}
  275. self.manifest.write()
  276. self.repository.commit()
  277. self.cache.commit()
  278. def calc_stats(self, cache):
  279. def add(id):
  280. count, size, csize = cache.chunks[id]
  281. stats.update(size, csize, count == 1)
  282. cache.chunks[id] = count - 1, size, csize
  283. def add_file_chunks(chunks):
  284. for id, _, _ in chunks:
  285. add(id)
  286. # This function is a bit evil since it abuses the cache to calculate
  287. # the stats. The cache transaction must be rolled back afterwards
  288. unpacker = msgpack.Unpacker(use_list=False)
  289. cache.begin_txn()
  290. stats = Statistics()
  291. add(self.id)
  292. for id, chunk in zip(self.metadata[b'items'], self.repository.get_many(self.metadata[b'items'])):
  293. add(id)
  294. unpacker.feed(self.key.decrypt(id, chunk))
  295. for item in unpacker:
  296. if b'chunks' in item:
  297. stats.nfiles += 1
  298. add_file_chunks(item[b'chunks'])
  299. cache.rollback()
  300. return stats
  301. def extract_item(self, item, restore_attrs=True, dry_run=False, stdout=False, sparse=False):
  302. has_damaged_chunks = b'chunks_healthy' in item
  303. if dry_run or stdout:
  304. if b'chunks' in item:
  305. for data in self.pipeline.fetch_many([c[0] for c in item[b'chunks']], is_preloaded=True):
  306. if stdout:
  307. sys.stdout.buffer.write(data)
  308. if stdout:
  309. sys.stdout.buffer.flush()
  310. if has_damaged_chunks:
  311. logger.warning('File %s has damaged (all-zero) chunks. Try running borg check --repair.' %
  312. remove_surrogates(item[b'path']))
  313. return
  314. dest = self.cwd
  315. if item[b'path'].startswith(('/', '../')):
  316. raise Exception('Path should be relative and local')
  317. path = os.path.join(dest, item[b'path'])
  318. # Attempt to remove existing files, ignore errors on failure
  319. try:
  320. st = os.stat(path, follow_symlinks=False)
  321. if stat.S_ISDIR(st.st_mode):
  322. os.rmdir(path)
  323. else:
  324. os.unlink(path)
  325. except UnicodeEncodeError:
  326. raise self.IncompatibleFilesystemEncodingError(path, sys.getfilesystemencoding()) from None
  327. except OSError:
  328. pass
  329. def make_parent(path):
  330. parent_dir = os.path.dirname(path)
  331. if not os.path.exists(parent_dir):
  332. os.makedirs(parent_dir)
  333. mode = item[b'mode']
  334. if stat.S_ISREG(mode):
  335. with backup_io():
  336. make_parent(path)
  337. # Hard link?
  338. if b'source' in item:
  339. source = os.path.join(dest, item[b'source'])
  340. with backup_io():
  341. os.link(source, path)
  342. else:
  343. if sparse and self.zeros is None:
  344. self.zeros = b'\0' * (1 << self.chunker_params[1])
  345. with backup_io():
  346. fd = open(path, 'wb')
  347. with fd:
  348. ids = [c[0] for c in item[b'chunks']]
  349. for data in self.pipeline.fetch_many(ids, is_preloaded=True):
  350. with backup_io():
  351. if sparse and self.zeros.startswith(data):
  352. # all-zero chunk: create a hole in a sparse file
  353. fd.seek(len(data), 1)
  354. else:
  355. fd.write(data)
  356. with backup_io():
  357. pos = fd.tell()
  358. fd.truncate(pos)
  359. fd.flush()
  360. self.restore_attrs(path, item, fd=fd.fileno())
  361. if has_damaged_chunks:
  362. logger.warning('File %s has damaged (all-zero) chunks. Try running borg check --repair.' %
  363. remove_surrogates(item[b'path']))
  364. return
  365. with backup_io():
  366. # No repository access beyond this point.
  367. if stat.S_ISDIR(mode):
  368. make_parent(path)
  369. if not os.path.exists(path):
  370. os.mkdir(path)
  371. if restore_attrs:
  372. self.restore_attrs(path, item)
  373. elif stat.S_ISLNK(mode):
  374. make_parent(path)
  375. source = item[b'source']
  376. try:
  377. os.symlink(source, path)
  378. except UnicodeEncodeError:
  379. raise self.IncompatibleFilesystemEncodingError(source, sys.getfilesystemencoding()) from None
  380. self.restore_attrs(path, item, symlink=True)
  381. elif stat.S_ISFIFO(mode):
  382. make_parent(path)
  383. os.mkfifo(path)
  384. self.restore_attrs(path, item)
  385. elif stat.S_ISCHR(mode) or stat.S_ISBLK(mode):
  386. make_parent(path)
  387. os.mknod(path, item[b'mode'], item[b'rdev'])
  388. self.restore_attrs(path, item)
  389. else:
  390. raise Exception('Unknown archive item type %r' % item[b'mode'])
  391. def restore_attrs(self, path, item, symlink=False, fd=None):
  392. """
  393. Restore filesystem attributes on *path* (*fd*) from *item*.
  394. Does not access the repository.
  395. """
  396. uid = gid = None
  397. if not self.numeric_owner:
  398. uid = user2uid(item[b'user'])
  399. gid = group2gid(item[b'group'])
  400. uid = item[b'uid'] if uid is None else uid
  401. gid = item[b'gid'] if gid is None else gid
  402. # This code is a bit of a mess due to os specific differences
  403. try:
  404. if fd:
  405. os.fchown(fd, uid, gid)
  406. else:
  407. os.chown(path, uid, gid, follow_symlinks=False)
  408. except OSError:
  409. pass
  410. if fd:
  411. os.fchmod(fd, item[b'mode'])
  412. elif not symlink:
  413. os.chmod(path, item[b'mode'])
  414. elif has_lchmod: # Not available on Linux
  415. os.lchmod(path, item[b'mode'])
  416. mtime = bigint_to_int(item[b'mtime'])
  417. if b'atime' in item:
  418. atime = bigint_to_int(item[b'atime'])
  419. else:
  420. # old archives only had mtime in item metadata
  421. atime = mtime
  422. if fd:
  423. os.utime(fd, None, ns=(atime, mtime))
  424. else:
  425. os.utime(path, None, ns=(atime, mtime), follow_symlinks=False)
  426. acl_set(path, item, self.numeric_owner)
  427. # chown removes Linux capabilities, so set the extended attributes at the end, after chown, since they include
  428. # the Linux capabilities in the "security.capability" attribute.
  429. xattrs = item.get(b'xattrs', {})
  430. for k, v in xattrs.items():
  431. try:
  432. xattr.setxattr(fd or path, k, v, follow_symlinks=False)
  433. except OSError as e:
  434. if e.errno == errno.E2BIG:
  435. # xattr is too big
  436. logger.warning('%s: Value or key of extended attribute %s is too big for this filesystem' %
  437. (path, k.decode()))
  438. set_ec(EXIT_WARNING)
  439. elif e.errno == errno.ENOTSUP:
  440. # xattrs not supported here
  441. logger.warning('%s: Extended attributes are not supported on this filesystem' % path)
  442. set_ec(EXIT_WARNING)
  443. elif e.errno == errno.EACCES:
  444. # permission denied to set this specific xattr (this may happen related to security.* keys)
  445. logger.warning('%s: Permission denied when setting extended attribute %s' % (path, k.decode()))
  446. set_ec(EXIT_WARNING)
  447. elif e.errno == errno.ENOSPC:
  448. # no space left on device while setting this specific xattr
  449. # ext4 reports ENOSPC when trying to set an xattr with >4kiB while ext4 can only support 4kiB xattrs
  450. # (in this case, this is NOT a "disk full" error, just a ext4 limitation).
  451. logger.warning('%s: No space left on device while setting extended attribute %s (len = %d)' % (
  452. path, k.decode(), len(v)))
  453. set_ec(EXIT_WARNING)
  454. else:
  455. raise
  456. # Only available on OS X and FreeBSD
  457. if has_lchflags and b'bsdflags' in item:
  458. try:
  459. os.lchflags(path, item[b'bsdflags'])
  460. except OSError:
  461. pass
  462. def rename(self, name):
  463. if name in self.manifest.archives:
  464. raise self.AlreadyExists(name)
  465. metadata = StableDict(self._load_meta(self.id))
  466. metadata[b'name'] = name
  467. data = msgpack.packb(metadata, unicode_errors='surrogateescape')
  468. new_id = self.key.id_hash(data)
  469. self.cache.add_chunk(new_id, data, self.stats)
  470. self.manifest.archives[name] = {'id': new_id, 'time': metadata[b'time']}
  471. self.cache.chunk_decref(self.id, self.stats)
  472. del self.manifest.archives[self.name]
  473. def delete(self, stats, progress=False, forced=False):
  474. class ChunksIndexError(Error):
  475. """Chunk ID {} missing from chunks index, corrupted chunks index - aborting transaction."""
  476. def chunk_decref(id, stats):
  477. nonlocal error
  478. try:
  479. self.cache.chunk_decref(id, stats)
  480. except KeyError:
  481. cid = bin_to_hex(id)
  482. raise ChunksIndexError(cid)
  483. except Repository.ObjectNotFound as e:
  484. # object not in repo - strange, but we wanted to delete it anyway.
  485. if forced == 0:
  486. raise
  487. error = True
  488. error = False
  489. try:
  490. unpacker = msgpack.Unpacker(use_list=False)
  491. items_ids = self.metadata[b'items']
  492. pi = ProgressIndicatorPercent(total=len(items_ids), msg="Decrementing references %3.0f%%", same_line=True)
  493. for (i, (items_id, data)) in enumerate(zip(items_ids, self.repository.get_many(items_ids))):
  494. if progress:
  495. pi.show(i)
  496. unpacker.feed(self.key.decrypt(items_id, data))
  497. chunk_decref(items_id, stats)
  498. try:
  499. for item in unpacker:
  500. if b'chunks' in item:
  501. for chunk_id, size, csize in item[b'chunks']:
  502. chunk_decref(chunk_id, stats)
  503. except (TypeError, ValueError):
  504. # if items metadata spans multiple chunks and one chunk got dropped somehow,
  505. # it could be that unpacker yields bad types
  506. if forced == 0:
  507. raise
  508. error = True
  509. if progress:
  510. pi.finish()
  511. except (msgpack.UnpackException, Repository.ObjectNotFound):
  512. # items metadata corrupted
  513. if forced == 0:
  514. raise
  515. error = True
  516. # in forced delete mode, we try hard to delete at least the manifest entry,
  517. # if possible also the archive superblock, even if processing the items raises
  518. # some harmless exception.
  519. chunk_decref(self.id, stats)
  520. del self.manifest.archives[self.name]
  521. if error:
  522. logger.warning('forced deletion succeeded, but the deleted archive was corrupted.')
  523. logger.warning('borg check --repair is required to free all space.')
  524. def stat_attrs(self, st, path):
  525. item = {
  526. b'mode': st.st_mode,
  527. b'uid': st.st_uid, b'user': uid2user(st.st_uid),
  528. b'gid': st.st_gid, b'group': gid2group(st.st_gid),
  529. b'mtime': int_to_bigint(safe_ns(st.st_mtime_ns)),
  530. }
  531. # borg can work with archives only having mtime (older attic archives do not have
  532. # atime/ctime). it can be useful to omit atime/ctime, if they change without the
  533. # file content changing - e.g. to get better metadata deduplication.
  534. if not self.noatime:
  535. item[b'atime'] = int_to_bigint(safe_ns(st.st_atime_ns))
  536. if not self.noctime:
  537. item[b'ctime'] = int_to_bigint(safe_ns(st.st_ctime_ns))
  538. if self.numeric_owner:
  539. item[b'user'] = item[b'group'] = None
  540. with backup_io():
  541. xattrs = xattr.get_all(path, follow_symlinks=False)
  542. if xattrs:
  543. item[b'xattrs'] = StableDict(xattrs)
  544. if has_lchflags and st.st_flags:
  545. item[b'bsdflags'] = st.st_flags
  546. with backup_io():
  547. acl_get(path, item, st, self.numeric_owner)
  548. return item
  549. def process_dir(self, path, st):
  550. item = {b'path': make_path_safe(path)}
  551. item.update(self.stat_attrs(st, path))
  552. self.add_item(item)
  553. return 'd' # directory
  554. def process_fifo(self, path, st):
  555. item = {b'path': make_path_safe(path)}
  556. item.update(self.stat_attrs(st, path))
  557. self.add_item(item)
  558. return 'f' # fifo
  559. def process_dev(self, path, st):
  560. item = {b'path': make_path_safe(path), b'rdev': st.st_rdev}
  561. item.update(self.stat_attrs(st, path))
  562. self.add_item(item)
  563. if stat.S_ISCHR(st.st_mode):
  564. return 'c' # char device
  565. elif stat.S_ISBLK(st.st_mode):
  566. return 'b' # block device
  567. def process_symlink(self, path, st):
  568. # note: we can not support hardlinked symlinks,
  569. # due to the dual-use of item[b'source'], see issue #2343:
  570. # hardlinked symlinks will be archived [and extracted] as non-hardlinked symlinks.
  571. with backup_io():
  572. source = os.readlink(path)
  573. item = {b'path': make_path_safe(path), b'source': source}
  574. item.update(self.stat_attrs(st, path))
  575. self.add_item(item)
  576. return 's' # symlink
  577. def process_stdin(self, path, cache):
  578. uid, gid = 0, 0
  579. fd = sys.stdin.buffer # binary
  580. chunks = []
  581. for chunk in backup_io_iter(self.chunker.chunkify(fd)):
  582. chunks.append(cache.add_chunk(self.key.id_hash(chunk), chunk, self.stats))
  583. self.stats.nfiles += 1
  584. t = int_to_bigint(int(time.time()) * 1000000000)
  585. item = {
  586. b'path': path,
  587. b'chunks': chunks,
  588. b'mode': 0o100660, # regular file, ug=rw
  589. b'uid': uid, b'user': uid2user(uid),
  590. b'gid': gid, b'group': gid2group(gid),
  591. b'mtime': t, b'atime': t, b'ctime': t,
  592. }
  593. self.add_item(item)
  594. return 'i' # stdin
  595. def process_file(self, path, st, cache, ignore_inode=False):
  596. status = None
  597. safe_path = make_path_safe(path)
  598. # Is it a hard link?
  599. if st.st_nlink > 1:
  600. source = self.hard_links.get((st.st_ino, st.st_dev))
  601. if source is not None:
  602. item = self.stat_attrs(st, path)
  603. item.update({b'path': safe_path, b'source': source})
  604. self.add_item(item)
  605. status = 'h' # regular file, hardlink (to already seen inodes)
  606. return status
  607. is_special_file = is_special(st.st_mode)
  608. if not is_special_file:
  609. path_hash = self.key.id_hash(os.path.join(self.cwd, path).encode('utf-8', 'surrogateescape'))
  610. ids = cache.file_known_and_unchanged(path_hash, st, ignore_inode)
  611. else:
  612. # in --read-special mode, we may be called for special files.
  613. # there should be no information in the cache about special files processed in
  614. # read-special mode, but we better play safe as this was wrong in the past:
  615. path_hash = ids = None
  616. first_run = not cache.files
  617. if first_run:
  618. logger.debug('Processing files ...')
  619. chunks = None
  620. if ids is not None:
  621. # Make sure all ids are available
  622. for id_ in ids:
  623. if not cache.seen_chunk(id_):
  624. break
  625. else:
  626. chunks = [cache.chunk_incref(id_, self.stats) for id_ in ids]
  627. status = 'U' # regular file, unchanged
  628. else:
  629. status = 'A' # regular file, added
  630. item = {b'path': safe_path}
  631. # Only chunkify the file if needed
  632. if chunks is None:
  633. with backup_io():
  634. fh = Archive._open_rb(path)
  635. with os.fdopen(fh, 'rb') as fd:
  636. chunks = []
  637. for chunk in backup_io_iter(self.chunker.chunkify(fd, fh)):
  638. chunks.append(cache.add_chunk(self.key.id_hash(chunk), chunk, self.stats))
  639. if self.show_progress:
  640. self.stats.show_progress(item=item, dt=0.2)
  641. if not is_special_file:
  642. # we must not memorize special files, because the contents of e.g. a
  643. # block or char device will change without its mtime/size/inode changing.
  644. cache.memorize_file(path_hash, st, [c[0] for c in chunks])
  645. status = status or 'M' # regular file, modified (if not 'A' already)
  646. item[b'chunks'] = chunks
  647. item.update(self.stat_attrs(st, path))
  648. if is_special_file:
  649. # we processed a special file like a regular file. reflect that in mode,
  650. # so it can be extracted / accessed in FUSE mount like a regular file:
  651. item[b'mode'] = stat.S_IFREG | stat.S_IMODE(item[b'mode'])
  652. self.stats.nfiles += 1
  653. self.add_item(item)
  654. if st.st_nlink > 1 and source is None:
  655. # Add the hard link reference *after* the file has been added to the archive.
  656. self.hard_links[st.st_ino, st.st_dev] = safe_path
  657. return status
  658. @staticmethod
  659. def list_archives(repository, key, manifest, cache=None):
  660. # expensive! see also Manifest.list_archive_infos.
  661. for name, info in manifest.archives.items():
  662. yield Archive(repository, key, manifest, name, cache=cache)
  663. @staticmethod
  664. def _open_rb(path):
  665. try:
  666. # if we have O_NOATIME, this likely will succeed if we are root or owner of file:
  667. return os.open(path, flags_noatime)
  668. except PermissionError:
  669. if flags_noatime == flags_normal:
  670. # we do not have O_NOATIME, no need to try again:
  671. raise
  672. # Was this EPERM due to the O_NOATIME flag? Try again without it:
  673. return os.open(path, flags_normal)
  674. # this set must be kept complete, otherwise the RobustUnpacker might malfunction:
  675. ITEM_KEYS = frozenset([b'path', b'source', b'rdev', b'chunks', b'chunks_healthy',
  676. b'mode', b'user', b'group', b'uid', b'gid', b'mtime', b'atime', b'ctime',
  677. b'xattrs', b'bsdflags', b'acl_nfs4', b'acl_access', b'acl_default', b'acl_extended', ])
  678. # this is the set of keys that are always present in items:
  679. REQUIRED_ITEM_KEYS = frozenset([b'path', b'mtime', ])
  680. # this set must be kept complete, otherwise rebuild_manifest might malfunction:
  681. ARCHIVE_KEYS = frozenset([b'version', b'name', b'items', b'cmdline', b'hostname', b'username', b'time', b'time_end', ])
  682. # this is the set of keys that are always present in archives:
  683. REQUIRED_ARCHIVE_KEYS = frozenset([b'version', b'name', b'items', b'cmdline', b'time', ])
  684. def valid_msgpacked_dict(d, keys_serialized):
  685. """check if the data <d> looks like a msgpacked dict"""
  686. d_len = len(d)
  687. if d_len == 0:
  688. return False
  689. if d[0] & 0xf0 == 0x80: # object is a fixmap (up to 15 elements)
  690. offs = 1
  691. elif d[0] == 0xde: # object is a map16 (up to 2^16-1 elements)
  692. offs = 3
  693. else:
  694. # object is not a map (dict)
  695. # note: we must not have dicts with > 2^16-1 elements
  696. return False
  697. if d_len <= offs:
  698. return False
  699. # is the first dict key a bytestring?
  700. if d[offs] & 0xe0 == 0xa0: # key is a small bytestring (up to 31 chars)
  701. pass
  702. elif d[offs] in (0xd9, 0xda, 0xdb): # key is a str8, str16 or str32
  703. pass
  704. else:
  705. # key is not a bytestring
  706. return False
  707. # is the bytestring any of the expected key names?
  708. key_serialized = d[offs:]
  709. return any(key_serialized.startswith(pattern) for pattern in keys_serialized)
  710. class RobustUnpacker:
  711. """A restartable/robust version of the streaming msgpack unpacker
  712. """
  713. class UnpackerCrashed(Exception):
  714. """raise if unpacker crashed"""
  715. def __init__(self, validator, item_keys):
  716. super().__init__()
  717. self.item_keys = [msgpack.packb(name) for name in item_keys]
  718. self.validator = validator
  719. self._buffered_data = []
  720. self._resync = False
  721. self._unpacker = msgpack.Unpacker(object_hook=StableDict)
  722. def resync(self):
  723. self._buffered_data = []
  724. self._resync = True
  725. def feed(self, data):
  726. if self._resync:
  727. self._buffered_data.append(data)
  728. else:
  729. self._unpacker.feed(data)
  730. def __iter__(self):
  731. return self
  732. def __next__(self):
  733. def unpack_next():
  734. try:
  735. return next(self._unpacker)
  736. except (TypeError, ValueError) as err:
  737. # transform exceptions that might be raised when feeding
  738. # msgpack with invalid data to a more specific exception
  739. raise self.UnpackerCrashed(str(err))
  740. if self._resync:
  741. data = b''.join(self._buffered_data)
  742. while self._resync:
  743. if not data:
  744. raise StopIteration
  745. # Abort early if the data does not look like a serialized item dict
  746. if not valid_msgpacked_dict(data, self.item_keys):
  747. data = data[1:]
  748. continue
  749. self._unpacker = msgpack.Unpacker(object_hook=StableDict)
  750. self._unpacker.feed(data)
  751. try:
  752. item = unpack_next()
  753. except (self.UnpackerCrashed, StopIteration):
  754. # as long as we are resyncing, we also ignore StopIteration
  755. pass
  756. else:
  757. if self.validator(item):
  758. self._resync = False
  759. return item
  760. data = data[1:]
  761. else:
  762. return unpack_next()
  763. class ArchiveChecker:
  764. def __init__(self):
  765. self.error_found = False
  766. self.possibly_superseded = set()
  767. def check(self, repository, repair=False, archive=None, last=None, prefix=None, save_space=False):
  768. logger.info('Starting archive consistency check...')
  769. self.check_all = archive is None and last is None and prefix is None
  770. self.repair = repair
  771. self.repository = repository
  772. self.init_chunks()
  773. if not self.chunks:
  774. logger.error('Repository contains no apparent data at all, cannot continue check/repair.')
  775. return False
  776. self.key = self.identify_key(repository)
  777. if Manifest.MANIFEST_ID not in self.chunks:
  778. logger.error("Repository manifest not found!")
  779. self.error_found = True
  780. self.manifest = self.rebuild_manifest()
  781. else:
  782. try:
  783. self.manifest, _ = Manifest.load(repository, (Manifest.Operation.CHECK,), key=self.key)
  784. except IntegrityError as exc:
  785. logger.error('Repository manifest is corrupted: %s', exc)
  786. self.error_found = True
  787. del self.chunks[Manifest.MANIFEST_ID]
  788. self.manifest = self.rebuild_manifest()
  789. self.rebuild_refcounts(archive=archive, last=last, prefix=prefix)
  790. self.orphan_chunks_check()
  791. self.finish(save_space=save_space)
  792. if self.error_found:
  793. logger.error('Archive consistency check complete, problems found.')
  794. else:
  795. logger.info('Archive consistency check complete, no problems found.')
  796. return self.repair or not self.error_found
  797. def init_chunks(self):
  798. """Fetch a list of all object keys from repository
  799. """
  800. # Explicitly set the initial hash table capacity to avoid performance issues
  801. # due to hash table "resonance".
  802. # Since reconstruction of archive items can add some new chunks, add 10 % headroom
  803. capacity = int(len(self.repository) / ChunkIndex.MAX_LOAD_FACTOR * 1.1)
  804. self.chunks = ChunkIndex(capacity)
  805. marker = None
  806. while True:
  807. result = self.repository.list(limit=LIST_SCAN_LIMIT, marker=marker)
  808. if not result:
  809. break
  810. marker = result[-1]
  811. for id_ in result:
  812. self.chunks[id_] = (0, 0, 0)
  813. def identify_key(self, repository):
  814. try:
  815. some_chunkid, _ = next(self.chunks.iteritems())
  816. except StopIteration:
  817. # repo is completely empty, no chunks
  818. return None
  819. cdata = repository.get(some_chunkid)
  820. return key_factory(repository, cdata)
  821. def rebuild_manifest(self):
  822. """Rebuild the manifest object if it is missing
  823. Iterates through all objects in the repository looking for archive metadata blocks.
  824. """
  825. def valid_archive(obj):
  826. if not isinstance(obj, dict):
  827. return False
  828. keys = set(obj)
  829. return REQUIRED_ARCHIVE_KEYS.issubset(keys)
  830. logger.info('Rebuilding missing manifest, this might take some time...')
  831. # as we have lost the manifest, we do not know any more what valid item keys we had.
  832. # collecting any key we encounter in a damaged repo seems unwise, thus we just use
  833. # the hardcoded list from the source code. thus, it is not recommended to rebuild a
  834. # lost manifest on a older borg version than the most recent one that was ever used
  835. # within this repository (assuming that newer borg versions support more item keys).
  836. manifest = Manifest(self.key, self.repository)
  837. archive_keys_serialized = [msgpack.packb(name) for name in ARCHIVE_KEYS]
  838. for chunk_id, _ in self.chunks.iteritems():
  839. cdata = self.repository.get(chunk_id)
  840. try:
  841. data = self.key.decrypt(chunk_id, cdata)
  842. except IntegrityError as exc:
  843. logger.error('Skipping corrupted chunk: %s', exc)
  844. self.error_found = True
  845. continue
  846. if not valid_msgpacked_dict(data, archive_keys_serialized):
  847. continue
  848. if b'cmdline' not in data or b'\xa7version\x01' not in data:
  849. continue
  850. try:
  851. archive = msgpack.unpackb(data)
  852. # Ignore exceptions that might be raised when feeding
  853. # msgpack with invalid data
  854. except (TypeError, ValueError, StopIteration):
  855. continue
  856. if valid_archive(archive):
  857. name = archive[b'name'].decode()
  858. logger.info('Found archive %s', name)
  859. if name in manifest.archives:
  860. i = 1
  861. while True:
  862. new_name = '%s.%d' % (name, i)
  863. if new_name not in manifest.archives:
  864. break
  865. i += 1
  866. logger.warning('Duplicate archive name %s, storing as %s', name, new_name)
  867. name = new_name
  868. manifest.archives[name] = {b'id': chunk_id, b'time': archive[b'time']}
  869. logger.info('Manifest rebuild complete.')
  870. return manifest
  871. def rebuild_refcounts(self, archive=None, last=None, prefix=None):
  872. """Rebuild object reference counts by walking the metadata
  873. Missing and/or incorrect data is repaired when detected
  874. """
  875. # Exclude the manifest from chunks
  876. del self.chunks[Manifest.MANIFEST_ID]
  877. def mark_as_possibly_superseded(id_):
  878. if self.chunks.get(id_, (0,))[0] == 0:
  879. self.possibly_superseded.add(id_)
  880. def add_callback(chunk):
  881. id_ = self.key.id_hash(chunk)
  882. cdata = self.key.encrypt(chunk)
  883. add_reference(id_, len(chunk), len(cdata), cdata)
  884. return id_
  885. def add_reference(id_, size, csize, cdata=None):
  886. try:
  887. self.chunks.incref(id_)
  888. except KeyError:
  889. assert cdata is not None
  890. self.chunks[id_] = 1, size, csize
  891. if self.repair:
  892. self.repository.put(id_, cdata)
  893. def verify_file_chunks(item):
  894. """Verifies that all file chunks are present.
  895. Missing file chunks will be replaced with new chunks of the same length containing all zeros.
  896. If a previously missing file chunk re-appears, the replacement chunk is replaced by the correct one.
  897. """
  898. def replacement_chunk(size):
  899. data = bytes(size)
  900. chunk_id = self.key.id_hash(data)
  901. cdata = self.key.encrypt(data)
  902. csize = len(cdata)
  903. return chunk_id, size, csize, cdata
  904. offset = 0
  905. chunk_list = []
  906. chunks_replaced = False
  907. has_chunks_healthy = b'chunks_healthy' in item
  908. chunks_current = item[b'chunks']
  909. chunks_healthy = item[b'chunks_healthy'] if has_chunks_healthy else chunks_current
  910. assert len(chunks_current) == len(chunks_healthy)
  911. for chunk_current, chunk_healthy in zip(chunks_current, chunks_healthy):
  912. chunk_id, size, csize = chunk_healthy
  913. if chunk_id not in self.chunks:
  914. # a chunk of the healthy list is missing
  915. if chunk_current == chunk_healthy:
  916. logger.error('{}: New missing file chunk detected (Byte {}-{}). '
  917. 'Replacing with all-zero chunk.'.format(
  918. item[b'path'].decode('utf-8', 'surrogateescape'), offset, offset + size))
  919. self.error_found = chunks_replaced = True
  920. chunk_id, size, csize, cdata = replacement_chunk(size)
  921. add_reference(chunk_id, size, csize, cdata)
  922. else:
  923. logger.info('{}: Previously missing file chunk is still missing (Byte {}-{}). '
  924. 'It has a all-zero replacement chunk already.'.format(
  925. item[b'path'].decode('utf-8', 'surrogateescape'), offset, offset + size))
  926. chunk_id, size, csize = chunk_current
  927. if chunk_id in self.chunks:
  928. add_reference(chunk_id, size, csize)
  929. else:
  930. logger.warning('{}: Missing all-zero replacement chunk detected (Byte {}-{}). '
  931. 'Generating new replacement chunk.'.format(item.path, offset, offset + size))
  932. self.error_found = chunks_replaced = True
  933. chunk_id, size, csize, cdata = replacement_chunk(size)
  934. add_reference(chunk_id, size, csize, cdata)
  935. else:
  936. if chunk_current == chunk_healthy:
  937. # normal case, all fine.
  938. add_reference(chunk_id, size, csize)
  939. else:
  940. logger.info('{}: Healed previously missing file chunk! (Byte {}-{}).'.format(
  941. item[b'path'].decode('utf-8', 'surrogateescape'), offset, offset + size))
  942. add_reference(chunk_id, size, csize)
  943. mark_as_possibly_superseded(chunk_current[0]) # maybe orphaned the all-zero replacement chunk
  944. chunk_list.append([chunk_id, size, csize]) # list-typed element as chunks_healthy is list-of-lists
  945. offset += size
  946. if chunks_replaced and not has_chunks_healthy:
  947. # if this is first repair, remember the correct chunk IDs, so we can maybe heal the file later
  948. item[b'chunks_healthy'] = item[b'chunks']
  949. if has_chunks_healthy and chunk_list == chunks_healthy:
  950. logger.info('{}: Completely healed previously damaged file!'.format(
  951. item[b'path'].decode('utf-8', 'surrogateescape')))
  952. del item[b'chunks_healthy']
  953. item[b'chunks'] = chunk_list
  954. def robust_iterator(archive):
  955. """Iterates through all archive items
  956. Missing item chunks will be skipped and the msgpack stream will be restarted
  957. """
  958. item_keys = self.manifest.item_keys
  959. unpacker = RobustUnpacker(lambda item: isinstance(item, dict) and b'path' in item, item_keys)
  960. _state = 0
  961. def missing_chunk_detector(chunk_id):
  962. nonlocal _state
  963. if _state % 2 != int(chunk_id not in self.chunks):
  964. _state += 1
  965. return _state
  966. def report(msg, chunk_id, chunk_no):
  967. cid = bin_to_hex(chunk_id)
  968. msg += ' [chunk: %06d_%s]' % (chunk_no, cid) # see debug-dump-archive-items
  969. self.error_found = True
  970. logger.error(msg)
  971. def list_keys_safe(keys):
  972. return ', '.join((k.decode() if isinstance(k, bytes) else str(k) for k in keys))
  973. def valid_item(obj):
  974. if not isinstance(obj, StableDict):
  975. return False, 'not a dictionary'
  976. # A bug in Attic up to and including release 0.13 added a (meaningless) b'acl' key to every item.
  977. # We ignore it here, should it exist. See test_attic013_acl_bug for details.
  978. obj.pop(b'acl', None)
  979. keys = set(obj)
  980. if not REQUIRED_ITEM_KEYS.issubset(keys):
  981. return False, 'missing required keys: ' + list_keys_safe(REQUIRED_ITEM_KEYS - keys)
  982. if not keys.issubset(item_keys):
  983. return False, 'invalid keys: ' + list_keys_safe(keys - item_keys)
  984. return True, ''
  985. i = 0
  986. for state, items in groupby(archive[b'items'], missing_chunk_detector):
  987. items = list(items)
  988. if state % 2:
  989. for chunk_id in items:
  990. report('item metadata chunk missing', chunk_id, i)
  991. i += 1
  992. continue
  993. if state > 0:
  994. unpacker.resync()
  995. for chunk_id, cdata in zip(items, repository.get_many(items)):
  996. unpacker.feed(self.key.decrypt(chunk_id, cdata))
  997. try:
  998. for item in unpacker:
  999. valid, reason = valid_item(item)
  1000. if valid:
  1001. yield item
  1002. else:
  1003. report('Did not get expected metadata dict when unpacking item metadata (%s)' % reason, chunk_id, i)
  1004. except RobustUnpacker.UnpackerCrashed as err:
  1005. report('Unpacker crashed while unpacking item metadata, trying to resync...', chunk_id, i)
  1006. unpacker.resync()
  1007. except Exception:
  1008. report('Exception while unpacking item metadata', chunk_id, i)
  1009. raise
  1010. i += 1
  1011. if archive is None:
  1012. # we need last N or all archives
  1013. archive_items = sorted(self.manifest.archives.items(), reverse=True,
  1014. key=lambda name_info: name_info[1][b'time'])
  1015. if prefix is not None:
  1016. archive_items = [item for item in archive_items if item[0].startswith(prefix)]
  1017. if not archive_items:
  1018. logger.warning('--prefix %s does not match any archives', prefix)
  1019. num_archives = len(archive_items)
  1020. end = None if last is None else min(num_archives, last)
  1021. if last is not None and end < last:
  1022. logger.warning('--last %d archives: only found %d archives', last, end)
  1023. else:
  1024. # we only want one specific archive
  1025. archive_items = [item for item in self.manifest.archives.items() if item[0] == archive]
  1026. num_archives = 1
  1027. end = 1
  1028. if not archive_items:
  1029. logger.error('Archive %s does not exist', archive)
  1030. self.error_found = True
  1031. return
  1032. with cache_if_remote(self.repository) as repository:
  1033. for i, (name, info) in enumerate(archive_items[:end]):
  1034. logger.info('Analyzing archive {} ({}/{})'.format(name, num_archives - i, num_archives))
  1035. archive_id = info[b'id']
  1036. if archive_id not in self.chunks:
  1037. logger.error('Archive metadata block is missing!')
  1038. self.error_found = True
  1039. del self.manifest.archives[name]
  1040. continue
  1041. mark_as_possibly_superseded(archive_id)
  1042. cdata = self.repository.get(archive_id)
  1043. data = self.key.decrypt(archive_id, cdata)
  1044. archive = StableDict(msgpack.unpackb(data))
  1045. if archive[b'version'] != 1:
  1046. raise Exception('Unknown archive metadata version')
  1047. decode_dict(archive, (b'name', b'hostname', b'username', b'time', b'time_end'))
  1048. archive[b'cmdline'] = [arg.decode('utf-8', 'surrogateescape') for arg in archive[b'cmdline']]
  1049. items_buffer = ChunkBuffer(self.key)
  1050. items_buffer.write_chunk = add_callback
  1051. for item in robust_iterator(archive):
  1052. if b'chunks' in item:
  1053. verify_file_chunks(item)
  1054. items_buffer.add(item)
  1055. items_buffer.flush(flush=True)
  1056. for previous_item_id in archive[b'items']:
  1057. mark_as_possibly_superseded(previous_item_id)
  1058. archive[b'items'] = items_buffer.chunks
  1059. data = msgpack.packb(archive, unicode_errors='surrogateescape')
  1060. new_archive_id = self.key.id_hash(data)
  1061. cdata = self.key.encrypt(data)
  1062. add_reference(new_archive_id, len(data), len(cdata), cdata)
  1063. info[b'id'] = new_archive_id
  1064. def orphan_chunks_check(self):
  1065. if self.check_all:
  1066. unused = set()
  1067. for id_, (count, size, csize) in self.chunks.iteritems():
  1068. if count == 0:
  1069. unused.add(id_)
  1070. orphaned = unused - self.possibly_superseded
  1071. if orphaned:
  1072. logger.error('{} orphaned objects found!'.format(len(orphaned)))
  1073. self.error_found = True
  1074. if self.repair:
  1075. for id_ in unused:
  1076. self.repository.delete(id_)
  1077. else:
  1078. logger.info('Orphaned objects check skipped (needs all archives checked).')
  1079. def finish(self, save_space=False):
  1080. if self.repair:
  1081. self.manifest.write()
  1082. self.repository.commit(save_space=save_space)