archive.py 48 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172
  1. from contextlib import contextmanager
  2. from datetime import datetime, timezone, timedelta
  3. from getpass import getuser
  4. from itertools import groupby
  5. import errno
  6. from .logger import create_logger
  7. logger = create_logger()
  8. from .key import key_factory
  9. from .remote import cache_if_remote
  10. import os
  11. import socket
  12. import stat
  13. import sys
  14. import time
  15. from io import BytesIO
  16. from . import xattr
  17. from .helpers import Error, uid2user, user2uid, gid2group, group2gid, bin_to_hex, \
  18. parse_timestamp, to_localtime, format_time, format_timedelta, remove_surrogates, \
  19. Manifest, Statistics, decode_dict, make_path_safe, StableDict, int_to_bigint, bigint_to_int, \
  20. ProgressIndicatorPercent, IntegrityError
  21. from .platform import acl_get, acl_set
  22. from .chunker import Chunker
  23. from .hashindex import ChunkIndex
  24. from .repository import Repository
  25. import msgpack
  26. ITEMS_BUFFER = 1024 * 1024
  27. CHUNK_MIN_EXP = 19 # 2**19 == 512kiB
  28. CHUNK_MAX_EXP = 23 # 2**23 == 8MiB
  29. HASH_WINDOW_SIZE = 0xfff # 4095B
  30. HASH_MASK_BITS = 21 # results in ~2MiB chunks statistically
  31. # defaults, use --chunker-params to override
  32. CHUNKER_PARAMS = (CHUNK_MIN_EXP, CHUNK_MAX_EXP, HASH_MASK_BITS, HASH_WINDOW_SIZE)
  33. # chunker params for the items metadata stream, finer granularity
  34. ITEMS_CHUNKER_PARAMS = (15, 19, 17, HASH_WINDOW_SIZE)
  35. has_lchmod = hasattr(os, 'lchmod')
  36. has_lchflags = hasattr(os, 'lchflags')
  37. flags_normal = os.O_RDONLY | getattr(os, 'O_BINARY', 0)
  38. flags_noatime = flags_normal | getattr(os, 'O_NOATIME', 0)
  39. def is_special(mode):
  40. # file types that get special treatment in --read-special mode
  41. return stat.S_ISBLK(mode) or stat.S_ISCHR(mode) or stat.S_ISFIFO(mode)
  42. class BackupOSError(Exception):
  43. """
  44. Wrapper for OSError raised while accessing backup files.
  45. Borg does different kinds of IO, and IO failures have different consequences.
  46. This wrapper represents failures of input file or extraction IO.
  47. These are non-critical and are only reported (exit code = 1, warning).
  48. Any unwrapped IO error is critical and aborts execution (for example repository IO failure).
  49. """
  50. def __init__(self, os_error):
  51. self.os_error = os_error
  52. self.errno = os_error.errno
  53. self.strerror = os_error.strerror
  54. self.filename = os_error.filename
  55. def __str__(self):
  56. return str(self.os_error)
  57. @contextmanager
  58. def backup_io():
  59. """Context manager changing OSError to BackupOSError."""
  60. try:
  61. yield
  62. except OSError as os_error:
  63. raise BackupOSError(os_error) from os_error
  64. def backup_io_iter(iterator):
  65. while True:
  66. try:
  67. with backup_io():
  68. item = next(iterator)
  69. except StopIteration:
  70. return
  71. yield item
  72. class DownloadPipeline:
  73. def __init__(self, repository, key):
  74. self.repository = repository
  75. self.key = key
  76. def unpack_many(self, ids, filter=None, preload=False):
  77. """
  78. Return iterator of items.
  79. *ids* is a chunk ID list of an item stream. *filter* is a callable
  80. to decide whether an item will be yielded. *preload* preloads the data chunks of every yielded item.
  81. Warning: if *preload* is True then all data chunks of every yielded item have to be retrieved,
  82. otherwise preloaded chunks will accumulate in RemoteRepository and create a memory leak.
  83. """
  84. unpacker = msgpack.Unpacker(use_list=False)
  85. for data in self.fetch_many(ids):
  86. unpacker.feed(data)
  87. items = [decode_dict(item, (b'path', b'source', b'user', b'group')) for item in unpacker]
  88. if filter:
  89. items = [item for item in items if filter(item)]
  90. if preload:
  91. for item in items:
  92. if b'chunks' in item:
  93. self.repository.preload([c[0] for c in item[b'chunks']])
  94. for item in items:
  95. yield item
  96. def fetch_many(self, ids, is_preloaded=False):
  97. for id_, data in zip(ids, self.repository.get_many(ids, is_preloaded=is_preloaded)):
  98. yield self.key.decrypt(id_, data)
  99. class ChunkBuffer:
  100. BUFFER_SIZE = 1 * 1024 * 1024
  101. def __init__(self, key, chunker_params=ITEMS_CHUNKER_PARAMS):
  102. self.buffer = BytesIO()
  103. self.packer = msgpack.Packer(unicode_errors='surrogateescape')
  104. self.chunks = []
  105. self.key = key
  106. self.chunker = Chunker(self.key.chunk_seed, *chunker_params)
  107. def add(self, item):
  108. self.buffer.write(self.packer.pack(StableDict(item)))
  109. if self.is_full():
  110. self.flush()
  111. def write_chunk(self, chunk):
  112. raise NotImplementedError
  113. def flush(self, flush=False):
  114. if self.buffer.tell() == 0:
  115. return
  116. self.buffer.seek(0)
  117. chunks = list(bytes(s) for s in self.chunker.chunkify(self.buffer))
  118. self.buffer.seek(0)
  119. self.buffer.truncate(0)
  120. # Leave the last partial chunk in the buffer unless flush is True
  121. end = None if flush or len(chunks) == 1 else -1
  122. for chunk in chunks[:end]:
  123. self.chunks.append(self.write_chunk(chunk))
  124. if end == -1:
  125. self.buffer.write(chunks[-1])
  126. def is_full(self):
  127. return self.buffer.tell() > self.BUFFER_SIZE
  128. class CacheChunkBuffer(ChunkBuffer):
  129. def __init__(self, cache, key, stats, chunker_params=ITEMS_CHUNKER_PARAMS):
  130. super().__init__(key, chunker_params)
  131. self.cache = cache
  132. self.stats = stats
  133. def write_chunk(self, chunk):
  134. id_, _, _ = self.cache.add_chunk(self.key.id_hash(chunk), chunk, self.stats)
  135. return id_
  136. class Archive:
  137. class DoesNotExist(Error):
  138. """Archive {} does not exist"""
  139. class AlreadyExists(Error):
  140. """Archive {} already exists"""
  141. class IncompatibleFilesystemEncodingError(Error):
  142. """Failed to encode filename "{}" into file system encoding "{}". Consider configuring the LANG environment variable."""
  143. def __init__(self, repository, key, manifest, name, cache=None, create=False,
  144. checkpoint_interval=300, numeric_owner=False, noatime=False, noctime=False, progress=False,
  145. chunker_params=CHUNKER_PARAMS, start=None, start_monotonic=None, end=None):
  146. self.cwd = os.getcwd()
  147. self.key = key
  148. self.repository = repository
  149. self.cache = cache
  150. self.manifest = manifest
  151. self.hard_links = {}
  152. self.stats = Statistics()
  153. self.show_progress = progress
  154. self.name = name
  155. self.checkpoint_interval = checkpoint_interval
  156. self.numeric_owner = numeric_owner
  157. self.noatime = noatime
  158. self.noctime = noctime
  159. assert (start is None) == (start_monotonic is None), 'Logic error: if start is given, start_monotonic must be given as well and vice versa.'
  160. if start is None:
  161. start = datetime.utcnow()
  162. start_monotonic = time.monotonic()
  163. self.start = start
  164. self.start_monotonic = start_monotonic
  165. if end is None:
  166. end = datetime.utcnow()
  167. self.end = end
  168. self.pipeline = DownloadPipeline(self.repository, self.key)
  169. if create:
  170. self.items_buffer = CacheChunkBuffer(self.cache, self.key, self.stats)
  171. self.chunker = Chunker(self.key.chunk_seed, *chunker_params)
  172. if name in manifest.archives:
  173. raise self.AlreadyExists(name)
  174. self.last_checkpoint = time.monotonic()
  175. i = 0
  176. while True:
  177. self.checkpoint_name = '%s.checkpoint%s' % (name, i and ('.%d' % i) or '')
  178. if self.checkpoint_name not in manifest.archives:
  179. break
  180. i += 1
  181. else:
  182. if name not in self.manifest.archives:
  183. raise self.DoesNotExist(name)
  184. info = self.manifest.archives[name]
  185. self.load(info[b'id'])
  186. self.zeros = b'\0' * (1 << chunker_params[1])
  187. def _load_meta(self, id):
  188. data = self.key.decrypt(id, self.repository.get(id))
  189. metadata = msgpack.unpackb(data, unicode_errors='surrogateescape')
  190. if metadata[b'version'] != 1:
  191. raise Exception('Unknown archive metadata version')
  192. return metadata
  193. def load(self, id):
  194. self.id = id
  195. self.metadata = self._load_meta(self.id)
  196. decode_dict(self.metadata, (b'name', b'hostname', b'username', b'time', b'time_end'))
  197. self.metadata[b'cmdline'] = [arg.decode('utf-8', 'surrogateescape') for arg in self.metadata[b'cmdline']]
  198. self.name = self.metadata[b'name']
  199. @property
  200. def ts(self):
  201. """Timestamp of archive creation (start) in UTC"""
  202. ts = self.metadata[b'time']
  203. return parse_timestamp(ts)
  204. @property
  205. def ts_end(self):
  206. """Timestamp of archive creation (end) in UTC"""
  207. # fall back to time if there is no time_end present in metadata
  208. ts = self.metadata.get(b'time_end') or self.metadata[b'time']
  209. return parse_timestamp(ts)
  210. @property
  211. def fpr(self):
  212. return bin_to_hex(self.id)
  213. @property
  214. def duration(self):
  215. return format_timedelta(self.end - self.start)
  216. def __str__(self):
  217. return '''\
  218. Archive name: {0.name}
  219. Archive fingerprint: {0.fpr}
  220. Time (start): {start}
  221. Time (end): {end}
  222. Duration: {0.duration}
  223. Number of files: {0.stats.nfiles}'''.format(
  224. self,
  225. start=format_time(to_localtime(self.start.replace(tzinfo=timezone.utc))),
  226. end=format_time(to_localtime(self.end.replace(tzinfo=timezone.utc))))
  227. def __repr__(self):
  228. return 'Archive(%r)' % self.name
  229. def iter_items(self, filter=None, preload=False):
  230. for item in self.pipeline.unpack_many(self.metadata[b'items'], filter=filter, preload=preload):
  231. yield item
  232. def add_item(self, item):
  233. unknown_keys = set(item) - self.manifest.item_keys
  234. assert not unknown_keys, ('unknown item metadata keys detected, please update ITEM_KEYS: %s',
  235. ','.join(k.decode('ascii') for k in unknown_keys))
  236. if self.show_progress:
  237. self.stats.show_progress(item=item, dt=0.2)
  238. self.items_buffer.add(item)
  239. if time.monotonic() - self.last_checkpoint > self.checkpoint_interval:
  240. self.write_checkpoint()
  241. self.last_checkpoint = time.monotonic()
  242. def write_checkpoint(self):
  243. self.save(self.checkpoint_name)
  244. del self.manifest.archives[self.checkpoint_name]
  245. self.cache.chunk_decref(self.id, self.stats)
  246. def save(self, name=None, timestamp=None):
  247. name = name or self.name
  248. if name in self.manifest.archives:
  249. raise self.AlreadyExists(name)
  250. self.items_buffer.flush(flush=True)
  251. duration = timedelta(seconds=time.monotonic() - self.start_monotonic)
  252. if timestamp is None:
  253. self.end = datetime.utcnow()
  254. self.start = self.end - duration
  255. start = self.start
  256. end = self.end
  257. else:
  258. self.end = timestamp
  259. self.start = timestamp - duration
  260. end = timestamp
  261. start = self.start
  262. metadata = StableDict({
  263. 'version': 1,
  264. 'name': name,
  265. 'items': self.items_buffer.chunks,
  266. 'cmdline': sys.argv,
  267. 'hostname': socket.gethostname(),
  268. 'username': getuser(),
  269. 'time': start.isoformat(),
  270. 'time_end': end.isoformat(),
  271. })
  272. data = self.key.pack_and_authenticate_metadata(metadata, context=b'archive')
  273. self.id = self.key.id_hash(data)
  274. self.cache.add_chunk(self.id, data, self.stats)
  275. self.manifest.archives[name] = {'id': self.id, 'time': metadata['time']}
  276. self.manifest.write()
  277. self.repository.commit()
  278. self.cache.commit()
  279. def calc_stats(self, cache):
  280. def add(id):
  281. count, size, csize = cache.chunks[id]
  282. stats.update(size, csize, count == 1)
  283. cache.chunks[id] = count - 1, size, csize
  284. def add_file_chunks(chunks):
  285. for id, _, _ in chunks:
  286. add(id)
  287. # This function is a bit evil since it abuses the cache to calculate
  288. # the stats. The cache transaction must be rolled back afterwards
  289. unpacker = msgpack.Unpacker(use_list=False)
  290. cache.begin_txn()
  291. stats = Statistics()
  292. add(self.id)
  293. for id, chunk in zip(self.metadata[b'items'], self.repository.get_many(self.metadata[b'items'])):
  294. add(id)
  295. unpacker.feed(self.key.decrypt(id, chunk))
  296. for item in unpacker:
  297. if b'chunks' in item:
  298. stats.nfiles += 1
  299. add_file_chunks(item[b'chunks'])
  300. cache.rollback()
  301. return stats
  302. def extract_item(self, item, restore_attrs=True, dry_run=False, stdout=False, sparse=False):
  303. has_damaged_chunks = b'chunks_healthy' in item
  304. if dry_run or stdout:
  305. if b'chunks' in item:
  306. for data in self.pipeline.fetch_many([c[0] for c in item[b'chunks']], is_preloaded=True):
  307. if stdout:
  308. sys.stdout.buffer.write(data)
  309. if stdout:
  310. sys.stdout.buffer.flush()
  311. if has_damaged_chunks:
  312. logger.warning('File %s has damaged (all-zero) chunks. Try running borg check --repair.' %
  313. remove_surrogates(item[b'path']))
  314. return
  315. dest = self.cwd
  316. if item[b'path'].startswith('/') or item[b'path'].startswith('..'):
  317. raise Exception('Path should be relative and local')
  318. path = os.path.join(dest, item[b'path'])
  319. # Attempt to remove existing files, ignore errors on failure
  320. try:
  321. st = os.lstat(path)
  322. if stat.S_ISDIR(st.st_mode):
  323. os.rmdir(path)
  324. else:
  325. os.unlink(path)
  326. except UnicodeEncodeError:
  327. raise self.IncompatibleFilesystemEncodingError(path, sys.getfilesystemencoding()) from None
  328. except OSError:
  329. pass
  330. mode = item[b'mode']
  331. if stat.S_ISREG(mode):
  332. if not os.path.exists(os.path.dirname(path)):
  333. with backup_io():
  334. os.makedirs(os.path.dirname(path))
  335. # Hard link?
  336. if b'source' in item:
  337. source = os.path.join(dest, item[b'source'])
  338. with backup_io():
  339. if os.path.exists(path):
  340. os.unlink(path)
  341. os.link(source, path)
  342. else:
  343. with backup_io():
  344. fd = open(path, 'wb')
  345. with fd:
  346. ids = [c[0] for c in item[b'chunks']]
  347. for data in self.pipeline.fetch_many(ids, is_preloaded=True):
  348. with backup_io():
  349. if sparse and self.zeros.startswith(data):
  350. # all-zero chunk: create a hole in a sparse file
  351. fd.seek(len(data), 1)
  352. else:
  353. fd.write(data)
  354. with backup_io():
  355. pos = fd.tell()
  356. fd.truncate(pos)
  357. fd.flush()
  358. self.restore_attrs(path, item, fd=fd.fileno())
  359. if has_damaged_chunks:
  360. logger.warning('File %s has damaged (all-zero) chunks. Try running borg check --repair.' %
  361. remove_surrogates(item[b'path']))
  362. return
  363. with backup_io():
  364. # No repository access beyond this point.
  365. if stat.S_ISDIR(mode):
  366. if not os.path.exists(path):
  367. os.makedirs(path)
  368. if restore_attrs:
  369. self.restore_attrs(path, item)
  370. elif stat.S_ISLNK(mode):
  371. if not os.path.exists(os.path.dirname(path)):
  372. os.makedirs(os.path.dirname(path))
  373. source = item[b'source']
  374. if os.path.exists(path):
  375. os.unlink(path)
  376. try:
  377. os.symlink(source, path)
  378. except UnicodeEncodeError:
  379. raise self.IncompatibleFilesystemEncodingError(source, sys.getfilesystemencoding()) from None
  380. self.restore_attrs(path, item, symlink=True)
  381. elif stat.S_ISFIFO(mode):
  382. if not os.path.exists(os.path.dirname(path)):
  383. os.makedirs(os.path.dirname(path))
  384. os.mkfifo(path)
  385. self.restore_attrs(path, item)
  386. elif stat.S_ISCHR(mode) or stat.S_ISBLK(mode):
  387. os.mknod(path, item[b'mode'], item[b'rdev'])
  388. self.restore_attrs(path, item)
  389. else:
  390. raise Exception('Unknown archive item type %r' % item[b'mode'])
  391. def restore_attrs(self, path, item, symlink=False, fd=None):
  392. """
  393. Restore filesystem attributes on *path* (*fd*) from *item*.
  394. Does not access the repository.
  395. """
  396. uid = gid = None
  397. if not self.numeric_owner:
  398. uid = user2uid(item[b'user'])
  399. gid = group2gid(item[b'group'])
  400. uid = item[b'uid'] if uid is None else uid
  401. gid = item[b'gid'] if gid is None else gid
  402. # This code is a bit of a mess due to os specific differences
  403. try:
  404. if fd:
  405. os.fchown(fd, uid, gid)
  406. else:
  407. os.lchown(path, uid, gid)
  408. except OSError:
  409. pass
  410. if fd:
  411. os.fchmod(fd, item[b'mode'])
  412. elif not symlink:
  413. os.chmod(path, item[b'mode'])
  414. elif has_lchmod: # Not available on Linux
  415. os.lchmod(path, item[b'mode'])
  416. mtime = bigint_to_int(item[b'mtime'])
  417. if b'atime' in item:
  418. atime = bigint_to_int(item[b'atime'])
  419. else:
  420. # old archives only had mtime in item metadata
  421. atime = mtime
  422. if fd:
  423. os.utime(fd, None, ns=(atime, mtime))
  424. else:
  425. os.utime(path, None, ns=(atime, mtime), follow_symlinks=False)
  426. acl_set(path, item, self.numeric_owner)
  427. # Only available on OS X and FreeBSD
  428. if has_lchflags and b'bsdflags' in item:
  429. try:
  430. os.lchflags(path, item[b'bsdflags'])
  431. except OSError:
  432. pass
  433. # chown removes Linux capabilities, so set the extended attributes at the end, after chown, since they include
  434. # the Linux capabilities in the "security.capability" attribute.
  435. xattrs = item.get(b'xattrs', {})
  436. for k, v in xattrs.items():
  437. try:
  438. xattr.setxattr(fd or path, k, v, follow_symlinks=False)
  439. except OSError as e:
  440. if e.errno not in (errno.ENOTSUP, errno.EACCES):
  441. # only raise if the errno is not on our ignore list:
  442. # ENOTSUP == xattrs not supported here
  443. # EACCES == permission denied to set this specific xattr
  444. # (this may happen related to security.* keys)
  445. raise
  446. def rename(self, name):
  447. if name in self.manifest.archives:
  448. raise self.AlreadyExists(name)
  449. metadata = StableDict(self._load_meta(self.id))
  450. metadata[b'name'] = name
  451. data = msgpack.packb(metadata, unicode_errors='surrogateescape')
  452. new_id = self.key.id_hash(data)
  453. self.cache.add_chunk(new_id, data, self.stats)
  454. self.manifest.archives[name] = {'id': new_id, 'time': metadata[b'time']}
  455. self.cache.chunk_decref(self.id, self.stats)
  456. del self.manifest.archives[self.name]
  457. def delete(self, stats, progress=False, forced=False):
  458. class ChunksIndexError(Error):
  459. """Chunk ID {} missing from chunks index, corrupted chunks index - aborting transaction."""
  460. def chunk_decref(id, stats):
  461. nonlocal error
  462. try:
  463. self.cache.chunk_decref(id, stats)
  464. except KeyError:
  465. cid = bin_to_hex(id)
  466. raise ChunksIndexError(cid)
  467. except Repository.ObjectNotFound as e:
  468. # object not in repo - strange, but we wanted to delete it anyway.
  469. if not forced:
  470. raise
  471. error = True
  472. error = False
  473. try:
  474. unpacker = msgpack.Unpacker(use_list=False)
  475. items_ids = self.metadata[b'items']
  476. pi = ProgressIndicatorPercent(total=len(items_ids), msg="Decrementing references %3.0f%%", same_line=True)
  477. for (i, (items_id, data)) in enumerate(zip(items_ids, self.repository.get_many(items_ids))):
  478. if progress:
  479. pi.show(i)
  480. unpacker.feed(self.key.decrypt(items_id, data))
  481. chunk_decref(items_id, stats)
  482. try:
  483. for item in unpacker:
  484. if b'chunks' in item:
  485. for chunk_id, size, csize in item[b'chunks']:
  486. chunk_decref(chunk_id, stats)
  487. except (TypeError, ValueError):
  488. # if items metadata spans multiple chunks and one chunk got dropped somehow,
  489. # it could be that unpacker yields bad types
  490. if not forced:
  491. raise
  492. error = True
  493. if progress:
  494. pi.finish()
  495. except (msgpack.UnpackException, Repository.ObjectNotFound):
  496. # items metadata corrupted
  497. if not forced:
  498. raise
  499. error = True
  500. # in forced delete mode, we try hard to delete at least the manifest entry,
  501. # if possible also the archive superblock, even if processing the items raises
  502. # some harmless exception.
  503. chunk_decref(self.id, stats)
  504. del self.manifest.archives[self.name]
  505. if error:
  506. logger.warning('forced deletion succeeded, but the deleted archive was corrupted.')
  507. logger.warning('borg check --repair is required to free all space.')
  508. def stat_attrs(self, st, path):
  509. item = {
  510. b'mode': st.st_mode,
  511. b'uid': st.st_uid, b'user': uid2user(st.st_uid),
  512. b'gid': st.st_gid, b'group': gid2group(st.st_gid),
  513. b'mtime': int_to_bigint(st.st_mtime_ns),
  514. }
  515. # borg can work with archives only having mtime (older attic archives do not have
  516. # atime/ctime). it can be useful to omit atime/ctime, if they change without the
  517. # file content changing - e.g. to get better metadata deduplication.
  518. if not self.noatime:
  519. item[b'atime'] = int_to_bigint(st.st_atime_ns)
  520. if not self.noctime:
  521. item[b'ctime'] = int_to_bigint(st.st_ctime_ns)
  522. if self.numeric_owner:
  523. item[b'user'] = item[b'group'] = None
  524. with backup_io():
  525. xattrs = xattr.get_all(path, follow_symlinks=False)
  526. if xattrs:
  527. item[b'xattrs'] = StableDict(xattrs)
  528. if has_lchflags and st.st_flags:
  529. item[b'bsdflags'] = st.st_flags
  530. with backup_io():
  531. acl_get(path, item, st, self.numeric_owner)
  532. return item
  533. def process_dir(self, path, st):
  534. item = {b'path': make_path_safe(path)}
  535. item.update(self.stat_attrs(st, path))
  536. self.add_item(item)
  537. return 'd' # directory
  538. def process_fifo(self, path, st):
  539. item = {b'path': make_path_safe(path)}
  540. item.update(self.stat_attrs(st, path))
  541. self.add_item(item)
  542. return 'f' # fifo
  543. def process_dev(self, path, st):
  544. item = {b'path': make_path_safe(path), b'rdev': st.st_rdev}
  545. item.update(self.stat_attrs(st, path))
  546. self.add_item(item)
  547. if stat.S_ISCHR(st.st_mode):
  548. return 'c' # char device
  549. elif stat.S_ISBLK(st.st_mode):
  550. return 'b' # block device
  551. def process_symlink(self, path, st):
  552. with backup_io():
  553. source = os.readlink(path)
  554. item = {b'path': make_path_safe(path), b'source': source}
  555. item.update(self.stat_attrs(st, path))
  556. self.add_item(item)
  557. return 's' # symlink
  558. def process_stdin(self, path, cache):
  559. uid, gid = 0, 0
  560. fd = sys.stdin.buffer # binary
  561. chunks = []
  562. for chunk in backup_io_iter(self.chunker.chunkify(fd)):
  563. chunks.append(cache.add_chunk(self.key.id_hash(chunk), chunk, self.stats))
  564. self.stats.nfiles += 1
  565. t = int_to_bigint(int(time.time()) * 1000000000)
  566. item = {
  567. b'path': path,
  568. b'chunks': chunks,
  569. b'mode': 0o100660, # regular file, ug=rw
  570. b'uid': uid, b'user': uid2user(uid),
  571. b'gid': gid, b'group': gid2group(gid),
  572. b'mtime': t, b'atime': t, b'ctime': t,
  573. }
  574. self.add_item(item)
  575. return 'i' # stdin
  576. def process_file(self, path, st, cache, ignore_inode=False):
  577. status = None
  578. safe_path = make_path_safe(path)
  579. # Is it a hard link?
  580. if st.st_nlink > 1:
  581. source = self.hard_links.get((st.st_ino, st.st_dev))
  582. if source is not None:
  583. item = self.stat_attrs(st, path)
  584. item.update({b'path': safe_path, b'source': source})
  585. self.add_item(item)
  586. status = 'h' # regular file, hardlink (to already seen inodes)
  587. return status
  588. is_special_file = is_special(st.st_mode)
  589. if not is_special_file:
  590. path_hash = self.key.id_hash(os.path.join(self.cwd, path).encode('utf-8', 'surrogateescape'))
  591. ids = cache.file_known_and_unchanged(path_hash, st, ignore_inode)
  592. else:
  593. # in --read-special mode, we may be called for special files.
  594. # there should be no information in the cache about special files processed in
  595. # read-special mode, but we better play safe as this was wrong in the past:
  596. path_hash = ids = None
  597. first_run = not cache.files
  598. if first_run:
  599. logger.debug('Processing files ...')
  600. chunks = None
  601. if ids is not None:
  602. # Make sure all ids are available
  603. for id_ in ids:
  604. if not cache.seen_chunk(id_):
  605. break
  606. else:
  607. chunks = [cache.chunk_incref(id_, self.stats) for id_ in ids]
  608. status = 'U' # regular file, unchanged
  609. else:
  610. status = 'A' # regular file, added
  611. item = {b'path': safe_path}
  612. # Only chunkify the file if needed
  613. if chunks is None:
  614. with backup_io():
  615. fh = Archive._open_rb(path)
  616. with os.fdopen(fh, 'rb') as fd:
  617. chunks = []
  618. for chunk in backup_io_iter(self.chunker.chunkify(fd, fh)):
  619. chunks.append(cache.add_chunk(self.key.id_hash(chunk), chunk, self.stats))
  620. if self.show_progress:
  621. self.stats.show_progress(item=item, dt=0.2)
  622. if not is_special_file:
  623. # we must not memorize special files, because the contents of e.g. a
  624. # block or char device will change without its mtime/size/inode changing.
  625. cache.memorize_file(path_hash, st, [c[0] for c in chunks])
  626. status = status or 'M' # regular file, modified (if not 'A' already)
  627. item[b'chunks'] = chunks
  628. item.update(self.stat_attrs(st, path))
  629. if is_special_file:
  630. # we processed a special file like a regular file. reflect that in mode,
  631. # so it can be extracted / accessed in FUSE mount like a regular file:
  632. item[b'mode'] = stat.S_IFREG | stat.S_IMODE(item[b'mode'])
  633. self.stats.nfiles += 1
  634. self.add_item(item)
  635. if st.st_nlink > 1 and source is None:
  636. # Add the hard link reference *after* the file has been added to the archive.
  637. self.hard_links[st.st_ino, st.st_dev] = safe_path
  638. return status
  639. @staticmethod
  640. def list_archives(repository, key, manifest, cache=None):
  641. # expensive! see also Manifest.list_archive_infos.
  642. for name, info in manifest.archives.items():
  643. yield Archive(repository, key, manifest, name, cache=cache)
  644. @staticmethod
  645. def _open_rb(path):
  646. try:
  647. # if we have O_NOATIME, this likely will succeed if we are root or owner of file:
  648. return os.open(path, flags_noatime)
  649. except PermissionError:
  650. if flags_noatime == flags_normal:
  651. # we do not have O_NOATIME, no need to try again:
  652. raise
  653. # Was this EPERM due to the O_NOATIME flag? Try again without it:
  654. return os.open(path, flags_normal)
  655. # this set must be kept complete, otherwise the RobustUnpacker might malfunction:
  656. ITEM_KEYS = frozenset([b'path', b'source', b'rdev', b'chunks', b'chunks_healthy',
  657. b'mode', b'user', b'group', b'uid', b'gid', b'mtime', b'atime', b'ctime',
  658. b'xattrs', b'bsdflags', b'acl_nfs4', b'acl_access', b'acl_default', b'acl_extended', ])
  659. # this is the set of keys that are always present in items:
  660. REQUIRED_ITEM_KEYS = frozenset([b'path', b'mtime', ])
  661. # this set must be kept complete, otherwise rebuild_manifest might malfunction:
  662. ARCHIVE_KEYS = frozenset([b'version', b'name', b'items', b'cmdline', b'hostname', b'username', b'time', b'time_end', ])
  663. # this is the set of keys that are always present in archives:
  664. REQUIRED_ARCHIVE_KEYS = frozenset([b'version', b'name', b'items', b'cmdline', b'time', ])
  665. def valid_msgpacked_dict(d, keys_serialized):
  666. """check if the data <d> looks like a msgpacked dict"""
  667. d_len = len(d)
  668. if d_len == 0:
  669. return False
  670. if d[0] & 0xf0 == 0x80: # object is a fixmap (up to 15 elements)
  671. offs = 1
  672. elif d[0] == 0xde: # object is a map16 (up to 2^16-1 elements)
  673. offs = 3
  674. else:
  675. # object is not a map (dict)
  676. # note: we must not have dicts with > 2^16-1 elements
  677. return False
  678. if d_len <= offs:
  679. return False
  680. # is the first dict key a bytestring?
  681. if d[offs] & 0xe0 == 0xa0: # key is a small bytestring (up to 31 chars)
  682. pass
  683. elif d[offs] in (0xd9, 0xda, 0xdb): # key is a str8, str16 or str32
  684. pass
  685. else:
  686. # key is not a bytestring
  687. return False
  688. # is the bytestring any of the expected key names?
  689. key_serialized = d[offs:]
  690. return any(key_serialized.startswith(pattern) for pattern in keys_serialized)
  691. class RobustUnpacker:
  692. """A restartable/robust version of the streaming msgpack unpacker
  693. """
  694. class UnpackerCrashed(Exception):
  695. """raise if unpacker crashed"""
  696. def __init__(self, validator, item_keys):
  697. super().__init__()
  698. self.item_keys = [msgpack.packb(name) for name in item_keys]
  699. self.validator = validator
  700. self._buffered_data = []
  701. self._resync = False
  702. self._unpacker = msgpack.Unpacker(object_hook=StableDict)
  703. def resync(self):
  704. self._buffered_data = []
  705. self._resync = True
  706. def feed(self, data):
  707. if self._resync:
  708. self._buffered_data.append(data)
  709. else:
  710. self._unpacker.feed(data)
  711. def __iter__(self):
  712. return self
  713. def __next__(self):
  714. def unpack_next():
  715. try:
  716. return next(self._unpacker)
  717. except (TypeError, ValueError) as err:
  718. # transform exceptions that might be raised when feeding
  719. # msgpack with invalid data to a more specific exception
  720. raise self.UnpackerCrashed(str(err))
  721. if self._resync:
  722. data = b''.join(self._buffered_data)
  723. while self._resync:
  724. if not data:
  725. raise StopIteration
  726. # Abort early if the data does not look like a serialized item dict
  727. if not valid_msgpacked_dict(data, self.item_keys):
  728. data = data[1:]
  729. continue
  730. self._unpacker = msgpack.Unpacker(object_hook=StableDict)
  731. self._unpacker.feed(data)
  732. try:
  733. item = unpack_next()
  734. except (self.UnpackerCrashed, StopIteration):
  735. # as long as we are resyncing, we also ignore StopIteration
  736. pass
  737. else:
  738. if self.validator(item):
  739. self._resync = False
  740. return item
  741. data = data[1:]
  742. else:
  743. return unpack_next()
  744. class ArchiveChecker:
  745. def __init__(self):
  746. self.error_found = False
  747. self.possibly_superseded = set()
  748. def check(self, repository, repair=False, archive=None, last=None, prefix=None, save_space=False):
  749. logger.info('Starting archive consistency check...')
  750. self.check_all = archive is None and last is None and prefix is None
  751. self.repair = repair
  752. self.repository = repository
  753. self.init_chunks()
  754. if not self.chunks:
  755. logger.error('Repository contains no apparent data at all, cannot continue check/repair.')
  756. return False
  757. self.key = self.identify_key(repository)
  758. if Manifest.MANIFEST_ID not in self.chunks:
  759. logger.error("Repository manifest not found!")
  760. self.error_found = True
  761. self.manifest = self.rebuild_manifest()
  762. else:
  763. try:
  764. self.manifest, _ = Manifest.load(repository, key=self.key)
  765. except IntegrityError as exc:
  766. logger.error('Repository manifest is corrupted: %s', exc)
  767. self.error_found = True
  768. del self.chunks[Manifest.MANIFEST_ID]
  769. self.manifest = self.rebuild_manifest()
  770. self.rebuild_refcounts(archive=archive, last=last, prefix=prefix)
  771. self.orphan_chunks_check()
  772. self.finish(save_space=save_space)
  773. if self.error_found:
  774. logger.error('Archive consistency check complete, problems found.')
  775. else:
  776. logger.info('Archive consistency check complete, no problems found.')
  777. return self.repair or not self.error_found
  778. def init_chunks(self):
  779. """Fetch a list of all object keys from repository
  780. """
  781. # Explicitly set the initial hash table capacity to avoid performance issues
  782. # due to hash table "resonance".
  783. # Since reconstruction of archive items can add some new chunks, add 10 % headroom
  784. capacity = int(len(self.repository) / ChunkIndex.MAX_LOAD_FACTOR * 1.1)
  785. self.chunks = ChunkIndex(capacity)
  786. marker = None
  787. while True:
  788. result = self.repository.list(limit=10000, marker=marker)
  789. if not result:
  790. break
  791. marker = result[-1]
  792. for id_ in result:
  793. self.chunks[id_] = (0, 0, 0)
  794. def identify_key(self, repository):
  795. try:
  796. some_chunkid, _ = next(self.chunks.iteritems())
  797. except StopIteration:
  798. # repo is completely empty, no chunks
  799. return None
  800. cdata = repository.get(some_chunkid)
  801. return key_factory(repository, cdata)
  802. def rebuild_manifest(self):
  803. """Rebuild the manifest object if it is missing
  804. Iterates through all objects in the repository looking for archive metadata blocks.
  805. """
  806. def valid_archive(obj):
  807. if not isinstance(obj, dict):
  808. return False
  809. keys = set(obj)
  810. return REQUIRED_ARCHIVE_KEYS.issubset(keys)
  811. logger.info('Rebuilding missing manifest, this might take some time...')
  812. # as we have lost the manifest, we do not know any more what valid item keys we had.
  813. # collecting any key we encounter in a damaged repo seems unwise, thus we just use
  814. # the hardcoded list from the source code. thus, it is not recommended to rebuild a
  815. # lost manifest on a older borg version than the most recent one that was ever used
  816. # within this repository (assuming that newer borg versions support more item keys).
  817. manifest = Manifest(self.key, self.repository)
  818. archive_keys_serialized = [msgpack.packb(name) for name in ARCHIVE_KEYS]
  819. for chunk_id, _ in self.chunks.iteritems():
  820. cdata = self.repository.get(chunk_id)
  821. try:
  822. data = self.key.decrypt(chunk_id, cdata)
  823. except IntegrityError as exc:
  824. logger.error('Skipping corrupted chunk: %s', exc)
  825. self.error_found = True
  826. continue
  827. if not valid_msgpacked_dict(data, archive_keys_serialized):
  828. continue
  829. if b'cmdline' not in data or b'\xa7version\x01' not in data:
  830. continue
  831. try:
  832. archive = msgpack.unpackb(data)
  833. # Ignore exceptions that might be raised when feeding
  834. # msgpack with invalid data
  835. except (TypeError, ValueError, StopIteration):
  836. continue
  837. if valid_archive(archive):
  838. name = archive[b'name'].decode()
  839. logger.info('Found archive %s', name)
  840. if name in manifest.archives:
  841. i = 1
  842. while True:
  843. new_name = '%s.%d' % (name, i)
  844. if new_name not in manifest.archives:
  845. break
  846. i += 1
  847. logger.warning('Duplicate archive name %s, storing as %s', name, new_name)
  848. name = new_name
  849. manifest.archives[name] = {b'id': chunk_id, b'time': archive[b'time']}
  850. logger.info('Manifest rebuild complete.')
  851. return manifest
  852. def rebuild_refcounts(self, archive=None, last=None, prefix=None):
  853. """Rebuild object reference counts by walking the metadata
  854. Missing and/or incorrect data is repaired when detected
  855. """
  856. # Exclude the manifest from chunks
  857. del self.chunks[Manifest.MANIFEST_ID]
  858. def mark_as_possibly_superseded(id_):
  859. if self.chunks.get(id_, (0,))[0] == 0:
  860. self.possibly_superseded.add(id_)
  861. def add_callback(chunk):
  862. id_ = self.key.id_hash(chunk)
  863. cdata = self.key.encrypt(chunk)
  864. add_reference(id_, len(chunk), len(cdata), cdata)
  865. return id_
  866. def add_reference(id_, size, csize, cdata=None):
  867. try:
  868. self.chunks.incref(id_)
  869. except KeyError:
  870. assert cdata is not None
  871. self.chunks[id_] = 1, size, csize
  872. if self.repair:
  873. self.repository.put(id_, cdata)
  874. def verify_file_chunks(item):
  875. """Verifies that all file chunks are present.
  876. Missing file chunks will be replaced with new chunks of the same length containing all zeros.
  877. If a previously missing file chunk re-appears, the replacement chunk is replaced by the correct one.
  878. """
  879. offset = 0
  880. chunk_list = []
  881. chunks_replaced = False
  882. has_chunks_healthy = b'chunks_healthy' in item
  883. chunks_current = item[b'chunks']
  884. chunks_healthy = item[b'chunks_healthy'] if has_chunks_healthy else chunks_current
  885. assert len(chunks_current) == len(chunks_healthy)
  886. for chunk_current, chunk_healthy in zip(chunks_current, chunks_healthy):
  887. chunk_id, size, csize = chunk_healthy
  888. if chunk_id not in self.chunks:
  889. # a chunk of the healthy list is missing
  890. if chunk_current == chunk_healthy:
  891. logger.error('{}: New missing file chunk detected (Byte {}-{}). '
  892. 'Replacing with all-zero chunk.'.format(
  893. item[b'path'].decode('utf-8', 'surrogateescape'), offset, offset + size))
  894. self.error_found = chunks_replaced = True
  895. data = bytes(size)
  896. chunk_id = self.key.id_hash(data)
  897. cdata = self.key.encrypt(data)
  898. csize = len(cdata)
  899. add_reference(chunk_id, size, csize, cdata)
  900. else:
  901. logger.info('{}: Previously missing file chunk is still missing (Byte {}-{}). '
  902. 'It has a all-zero replacement chunk already.'.format(
  903. item[b'path'].decode('utf-8', 'surrogateescape'), offset, offset + size))
  904. chunk_id, size, csize = chunk_current
  905. add_reference(chunk_id, size, csize)
  906. else:
  907. if chunk_current == chunk_healthy:
  908. # normal case, all fine.
  909. add_reference(chunk_id, size, csize)
  910. else:
  911. logger.info('{}: Healed previously missing file chunk! (Byte {}-{}).'.format(
  912. item[b'path'].decode('utf-8', 'surrogateescape'), offset, offset + size))
  913. add_reference(chunk_id, size, csize)
  914. mark_as_possibly_superseded(chunk_current[0]) # maybe orphaned the all-zero replacement chunk
  915. chunk_list.append([chunk_id, size, csize]) # list-typed element as chunks_healthy is list-of-lists
  916. offset += size
  917. if chunks_replaced and not has_chunks_healthy:
  918. # if this is first repair, remember the correct chunk IDs, so we can maybe heal the file later
  919. item[b'chunks_healthy'] = item[b'chunks']
  920. if has_chunks_healthy and chunk_list == chunks_healthy:
  921. logger.info('{}: Completely healed previously damaged file!'.format(
  922. item[b'path'].decode('utf-8', 'surrogateescape')))
  923. del item[b'chunks_healthy']
  924. item[b'chunks'] = chunk_list
  925. def robust_iterator(archive):
  926. """Iterates through all archive items
  927. Missing item chunks will be skipped and the msgpack stream will be restarted
  928. """
  929. item_keys = self.manifest.item_keys
  930. unpacker = RobustUnpacker(lambda item: isinstance(item, dict) and b'path' in item, item_keys)
  931. _state = 0
  932. def missing_chunk_detector(chunk_id):
  933. nonlocal _state
  934. if _state % 2 != int(chunk_id not in self.chunks):
  935. _state += 1
  936. return _state
  937. def report(msg, chunk_id, chunk_no):
  938. cid = bin_to_hex(chunk_id)
  939. msg += ' [chunk: %06d_%s]' % (chunk_no, cid) # see debug-dump-archive-items
  940. self.error_found = True
  941. logger.error(msg)
  942. def list_keys_safe(keys):
  943. return ', '.join((k.decode() if isinstance(k, bytes) else str(k) for k in keys))
  944. def valid_item(obj):
  945. if not isinstance(obj, StableDict):
  946. return False, 'not a dictionary'
  947. # A bug in Attic up to and including release 0.13 added a (meaningless) b'acl' key to every item.
  948. # We ignore it here, should it exist. See test_attic013_acl_bug for details.
  949. obj.pop(b'acl', None)
  950. keys = set(obj)
  951. if not REQUIRED_ITEM_KEYS.issubset(keys):
  952. return False, 'missing required keys: ' + list_keys_safe(REQUIRED_ITEM_KEYS - keys)
  953. if not keys.issubset(item_keys):
  954. return False, 'invalid keys: ' + list_keys_safe(keys - item_keys)
  955. return True, ''
  956. i = 0
  957. for state, items in groupby(archive[b'items'], missing_chunk_detector):
  958. items = list(items)
  959. if state % 2:
  960. for chunk_id in items:
  961. report('item metadata chunk missing', chunk_id, i)
  962. i += 1
  963. continue
  964. if state > 0:
  965. unpacker.resync()
  966. for chunk_id, cdata in zip(items, repository.get_many(items)):
  967. unpacker.feed(self.key.decrypt(chunk_id, cdata))
  968. try:
  969. for item in unpacker:
  970. valid, reason = valid_item(item)
  971. if valid:
  972. yield item
  973. else:
  974. report('Did not get expected metadata dict when unpacking item metadata (%s)' % reason, chunk_id, i)
  975. except RobustUnpacker.UnpackerCrashed as err:
  976. report('Unpacker crashed while unpacking item metadata, trying to resync...', chunk_id, i)
  977. unpacker.resync()
  978. except Exception:
  979. report('Exception while unpacking item metadata', chunk_id, i)
  980. raise
  981. i += 1
  982. if archive is None:
  983. # we need last N or all archives
  984. archive_items = sorted(self.manifest.archives.items(), reverse=True,
  985. key=lambda name_info: name_info[1][b'time'])
  986. if prefix is not None:
  987. archive_items = [item for item in archive_items if item[0].startswith(prefix)]
  988. if not archive_items:
  989. logger.warning('--prefix %s does not match any archives', prefix)
  990. num_archives = len(archive_items)
  991. end = None if last is None else min(num_archives, last)
  992. if last is not None and end < last:
  993. logger.warning('--last %d archives: only found %d archives', last, end)
  994. else:
  995. # we only want one specific archive
  996. archive_items = [item for item in self.manifest.archives.items() if item[0] == archive]
  997. num_archives = 1
  998. end = 1
  999. if not archive_items:
  1000. logger.error('Archive %s does not exist', archive)
  1001. self.error_found = True
  1002. return
  1003. with cache_if_remote(self.repository) as repository:
  1004. for i, (name, info) in enumerate(archive_items[:end]):
  1005. logger.info('Analyzing archive {} ({}/{})'.format(name, num_archives - i, num_archives))
  1006. archive_id = info[b'id']
  1007. if archive_id not in self.chunks:
  1008. logger.error('Archive metadata block is missing!')
  1009. self.error_found = True
  1010. del self.manifest.archives[name]
  1011. continue
  1012. mark_as_possibly_superseded(archive_id)
  1013. cdata = self.repository.get(archive_id)
  1014. data = self.key.decrypt(archive_id, cdata)
  1015. archive = StableDict(msgpack.unpackb(data))
  1016. if archive[b'version'] != 1:
  1017. raise Exception('Unknown archive metadata version')
  1018. decode_dict(archive, (b'name', b'hostname', b'username', b'time', b'time_end'))
  1019. archive[b'cmdline'] = [arg.decode('utf-8', 'surrogateescape') for arg in archive[b'cmdline']]
  1020. items_buffer = ChunkBuffer(self.key)
  1021. items_buffer.write_chunk = add_callback
  1022. for item in robust_iterator(archive):
  1023. if b'chunks' in item:
  1024. verify_file_chunks(item)
  1025. items_buffer.add(item)
  1026. items_buffer.flush(flush=True)
  1027. for previous_item_id in archive[b'items']:
  1028. mark_as_possibly_superseded(previous_item_id)
  1029. archive[b'items'] = items_buffer.chunks
  1030. data = msgpack.packb(archive, unicode_errors='surrogateescape')
  1031. new_archive_id = self.key.id_hash(data)
  1032. cdata = self.key.encrypt(data)
  1033. add_reference(new_archive_id, len(data), len(cdata), cdata)
  1034. info[b'id'] = new_archive_id
  1035. def orphan_chunks_check(self):
  1036. if self.check_all:
  1037. unused = set()
  1038. for id_, (count, size, csize) in self.chunks.iteritems():
  1039. if count == 0:
  1040. unused.add(id_)
  1041. orphaned = unused - self.possibly_superseded
  1042. if orphaned:
  1043. logger.error('{} orphaned objects found!'.format(len(orphaned)))
  1044. self.error_found = True
  1045. if self.repair:
  1046. for id_ in unused:
  1047. self.repository.delete(id_)
  1048. else:
  1049. logger.info('Orphaned objects check skipped (needs all archives checked).')
  1050. def finish(self, save_space=False):
  1051. if self.repair:
  1052. self.manifest.write()
  1053. self.repository.commit(save_space=save_space)