archive.py 50 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196
  1. from contextlib import contextmanager
  2. from datetime import datetime, timezone, timedelta
  3. from getpass import getuser
  4. from itertools import groupby
  5. import errno
  6. from .logger import create_logger
  7. logger = create_logger()
  8. from .key import key_factory
  9. from .remote import cache_if_remote
  10. import os
  11. import socket
  12. import stat
  13. import sys
  14. import time
  15. from io import BytesIO
  16. from . import xattr
  17. from .helpers import Error, uid2user, user2uid, gid2group, group2gid, bin_to_hex, \
  18. parse_timestamp, to_localtime, format_time, format_timedelta, remove_surrogates, \
  19. Manifest, Statistics, decode_dict, make_path_safe, StableDict, int_to_bigint, bigint_to_int, \
  20. ProgressIndicatorPercent, IntegrityError, set_ec, EXIT_WARNING, safe_ns
  21. from .platform import acl_get, acl_set
  22. from .chunker import Chunker
  23. from .hashindex import ChunkIndex
  24. from .repository import Repository, LIST_SCAN_LIMIT
  25. import msgpack
  26. ITEMS_BUFFER = 1024 * 1024
  27. CHUNK_MIN_EXP = 19 # 2**19 == 512kiB
  28. CHUNK_MAX_EXP = 23 # 2**23 == 8MiB
  29. HASH_WINDOW_SIZE = 0xfff # 4095B
  30. HASH_MASK_BITS = 21 # results in ~2MiB chunks statistically
  31. # defaults, use --chunker-params to override
  32. CHUNKER_PARAMS = (CHUNK_MIN_EXP, CHUNK_MAX_EXP, HASH_MASK_BITS, HASH_WINDOW_SIZE)
  33. # chunker params for the items metadata stream, finer granularity
  34. ITEMS_CHUNKER_PARAMS = (15, 19, 17, HASH_WINDOW_SIZE)
  35. has_lchmod = hasattr(os, 'lchmod')
  36. has_lchflags = hasattr(os, 'lchflags')
  37. flags_normal = os.O_RDONLY | getattr(os, 'O_BINARY', 0)
  38. flags_noatime = flags_normal | getattr(os, 'O_NOATIME', 0)
  39. def is_special(mode):
  40. # file types that get special treatment in --read-special mode
  41. return stat.S_ISBLK(mode) or stat.S_ISCHR(mode) or stat.S_ISFIFO(mode)
  42. class BackupOSError(Exception):
  43. """
  44. Wrapper for OSError raised while accessing backup files.
  45. Borg does different kinds of IO, and IO failures have different consequences.
  46. This wrapper represents failures of input file or extraction IO.
  47. These are non-critical and are only reported (exit code = 1, warning).
  48. Any unwrapped IO error is critical and aborts execution (for example repository IO failure).
  49. """
  50. def __init__(self, os_error):
  51. self.os_error = os_error
  52. self.errno = os_error.errno
  53. self.strerror = os_error.strerror
  54. self.filename = os_error.filename
  55. def __str__(self):
  56. return str(self.os_error)
  57. @contextmanager
  58. def backup_io():
  59. """Context manager changing OSError to BackupOSError."""
  60. try:
  61. yield
  62. except OSError as os_error:
  63. raise BackupOSError(os_error) from os_error
  64. def backup_io_iter(iterator):
  65. while True:
  66. try:
  67. with backup_io():
  68. item = next(iterator)
  69. except StopIteration:
  70. return
  71. yield item
  72. class DownloadPipeline:
  73. def __init__(self, repository, key):
  74. self.repository = repository
  75. self.key = key
  76. def unpack_many(self, ids, filter=None, preload=False):
  77. """
  78. Return iterator of items.
  79. *ids* is a chunk ID list of an item stream. *filter* is a callable
  80. to decide whether an item will be yielded. *preload* preloads the data chunks of every yielded item.
  81. Warning: if *preload* is True then all data chunks of every yielded item have to be retrieved,
  82. otherwise preloaded chunks will accumulate in RemoteRepository and create a memory leak.
  83. """
  84. unpacker = msgpack.Unpacker(use_list=False)
  85. for data in self.fetch_many(ids):
  86. unpacker.feed(data)
  87. items = [decode_dict(item, (b'path', b'source', b'user', b'group')) for item in unpacker]
  88. if filter:
  89. items = [item for item in items if filter(item)]
  90. if preload:
  91. for item in items:
  92. if b'chunks' in item:
  93. self.repository.preload([c[0] for c in item[b'chunks']])
  94. for item in items:
  95. yield item
  96. def fetch_many(self, ids, is_preloaded=False):
  97. for id_, data in zip(ids, self.repository.get_many(ids, is_preloaded=is_preloaded)):
  98. yield self.key.decrypt(id_, data)
  99. class ChunkBuffer:
  100. BUFFER_SIZE = 1 * 1024 * 1024
  101. def __init__(self, key, chunker_params=ITEMS_CHUNKER_PARAMS):
  102. self.buffer = BytesIO()
  103. self.packer = msgpack.Packer(unicode_errors='surrogateescape')
  104. self.chunks = []
  105. self.key = key
  106. self.chunker = Chunker(self.key.chunk_seed, *chunker_params)
  107. def add(self, item):
  108. self.buffer.write(self.packer.pack(StableDict(item)))
  109. if self.is_full():
  110. self.flush()
  111. def write_chunk(self, chunk):
  112. raise NotImplementedError
  113. def flush(self, flush=False):
  114. if self.buffer.tell() == 0:
  115. return
  116. self.buffer.seek(0)
  117. chunks = list(bytes(s) for s in self.chunker.chunkify(self.buffer))
  118. self.buffer.seek(0)
  119. self.buffer.truncate(0)
  120. # Leave the last partial chunk in the buffer unless flush is True
  121. end = None if flush or len(chunks) == 1 else -1
  122. for chunk in chunks[:end]:
  123. self.chunks.append(self.write_chunk(chunk))
  124. if end == -1:
  125. self.buffer.write(chunks[-1])
  126. def is_full(self):
  127. return self.buffer.tell() > self.BUFFER_SIZE
  128. class CacheChunkBuffer(ChunkBuffer):
  129. def __init__(self, cache, key, stats, chunker_params=ITEMS_CHUNKER_PARAMS):
  130. super().__init__(key, chunker_params)
  131. self.cache = cache
  132. self.stats = stats
  133. def write_chunk(self, chunk):
  134. id_, _, _ = self.cache.add_chunk(self.key.id_hash(chunk), chunk, self.stats)
  135. return id_
  136. class Archive:
  137. class DoesNotExist(Error):
  138. """Archive {} does not exist"""
  139. class AlreadyExists(Error):
  140. """Archive {} already exists"""
  141. class IncompatibleFilesystemEncodingError(Error):
  142. """Failed to encode filename "{}" into file system encoding "{}". Consider configuring the LANG environment variable."""
  143. def __init__(self, repository, key, manifest, name, cache=None, create=False,
  144. checkpoint_interval=1800, numeric_owner=False, noatime=False, noctime=False, progress=False,
  145. chunker_params=CHUNKER_PARAMS, start=None, start_monotonic=None, end=None):
  146. self.cwd = os.getcwd()
  147. self.key = key
  148. self.repository = repository
  149. self.cache = cache
  150. self.manifest = manifest
  151. self.hard_links = {}
  152. self.stats = Statistics()
  153. self.show_progress = progress
  154. self.name = name
  155. self.checkpoint_interval = checkpoint_interval
  156. self.numeric_owner = numeric_owner
  157. self.noatime = noatime
  158. self.noctime = noctime
  159. assert (start is None) == (start_monotonic is None), 'Logic error: if start is given, start_monotonic must be given as well and vice versa.'
  160. if start is None:
  161. start = datetime.utcnow()
  162. start_monotonic = time.monotonic()
  163. self.chunker_params = chunker_params
  164. self.start = start
  165. self.start_monotonic = start_monotonic
  166. if end is None:
  167. end = datetime.utcnow()
  168. self.end = end
  169. self.pipeline = DownloadPipeline(self.repository, self.key)
  170. if create:
  171. self.items_buffer = CacheChunkBuffer(self.cache, self.key, self.stats)
  172. self.chunker = Chunker(self.key.chunk_seed, *chunker_params)
  173. if name in manifest.archives:
  174. raise self.AlreadyExists(name)
  175. self.last_checkpoint = time.monotonic()
  176. i = 0
  177. while True:
  178. self.checkpoint_name = '%s.checkpoint%s' % (name, i and ('.%d' % i) or '')
  179. if self.checkpoint_name not in manifest.archives:
  180. break
  181. i += 1
  182. else:
  183. if name not in self.manifest.archives:
  184. raise self.DoesNotExist(name)
  185. info = self.manifest.archives[name]
  186. self.load(info[b'id'])
  187. self.zeros = None
  188. def _load_meta(self, id):
  189. data = self.key.decrypt(id, self.repository.get(id))
  190. metadata = msgpack.unpackb(data, unicode_errors='surrogateescape')
  191. if metadata[b'version'] != 1:
  192. raise Exception('Unknown archive metadata version')
  193. return metadata
  194. def load(self, id):
  195. self.id = id
  196. self.metadata = self._load_meta(self.id)
  197. decode_dict(self.metadata, (b'name', b'hostname', b'username', b'time', b'time_end'))
  198. self.metadata[b'cmdline'] = [arg.decode('utf-8', 'surrogateescape') for arg in self.metadata[b'cmdline']]
  199. self.name = self.metadata[b'name']
  200. @property
  201. def ts(self):
  202. """Timestamp of archive creation (start) in UTC"""
  203. ts = self.metadata[b'time']
  204. return parse_timestamp(ts)
  205. @property
  206. def ts_end(self):
  207. """Timestamp of archive creation (end) in UTC"""
  208. # fall back to time if there is no time_end present in metadata
  209. ts = self.metadata.get(b'time_end') or self.metadata[b'time']
  210. return parse_timestamp(ts)
  211. @property
  212. def fpr(self):
  213. return bin_to_hex(self.id)
  214. @property
  215. def duration(self):
  216. return format_timedelta(self.end - self.start)
  217. def __str__(self):
  218. return '''\
  219. Archive name: {0.name}
  220. Archive fingerprint: {0.fpr}
  221. Time (start): {start}
  222. Time (end): {end}
  223. Duration: {0.duration}
  224. Number of files: {0.stats.nfiles}'''.format(
  225. self,
  226. start=format_time(to_localtime(self.start.replace(tzinfo=timezone.utc))),
  227. end=format_time(to_localtime(self.end.replace(tzinfo=timezone.utc))))
  228. def __repr__(self):
  229. return 'Archive(%r)' % self.name
  230. def iter_items(self, filter=None, preload=False):
  231. for item in self.pipeline.unpack_many(self.metadata[b'items'], filter=filter, preload=preload):
  232. yield item
  233. def add_item(self, item):
  234. unknown_keys = set(item) - self.manifest.item_keys
  235. assert not unknown_keys, ('unknown item metadata keys detected, please update ITEM_KEYS: %s',
  236. ','.join(k.decode('ascii') for k in unknown_keys))
  237. if self.show_progress:
  238. self.stats.show_progress(item=item, dt=0.2)
  239. self.items_buffer.add(item)
  240. if time.monotonic() - self.last_checkpoint > self.checkpoint_interval:
  241. self.write_checkpoint()
  242. self.last_checkpoint = time.monotonic()
  243. def write_checkpoint(self):
  244. self.save(self.checkpoint_name)
  245. del self.manifest.archives[self.checkpoint_name]
  246. self.cache.chunk_decref(self.id, self.stats)
  247. def save(self, name=None, timestamp=None):
  248. name = name or self.name
  249. if name in self.manifest.archives:
  250. raise self.AlreadyExists(name)
  251. self.items_buffer.flush(flush=True)
  252. duration = timedelta(seconds=time.monotonic() - self.start_monotonic)
  253. if timestamp is None:
  254. self.end = datetime.utcnow()
  255. self.start = self.end - duration
  256. start = self.start
  257. end = self.end
  258. else:
  259. self.end = timestamp
  260. self.start = timestamp - duration
  261. end = timestamp
  262. start = self.start
  263. metadata = StableDict({
  264. 'version': 1,
  265. 'name': name,
  266. 'items': self.items_buffer.chunks,
  267. 'cmdline': sys.argv,
  268. 'hostname': socket.gethostname(),
  269. 'username': getuser(),
  270. 'time': start.isoformat(),
  271. 'time_end': end.isoformat(),
  272. })
  273. data = self.key.pack_and_authenticate_metadata(metadata, context=b'archive')
  274. self.id = self.key.id_hash(data)
  275. self.cache.add_chunk(self.id, data, self.stats)
  276. self.manifest.archives[name] = {'id': self.id, 'time': metadata['time']}
  277. self.manifest.write()
  278. self.repository.commit()
  279. self.cache.commit()
  280. def calc_stats(self, cache):
  281. def add(id):
  282. count, size, csize = cache.chunks[id]
  283. stats.update(size, csize, count == 1)
  284. cache.chunks[id] = count - 1, size, csize
  285. def add_file_chunks(chunks):
  286. for id, _, _ in chunks:
  287. add(id)
  288. # This function is a bit evil since it abuses the cache to calculate
  289. # the stats. The cache transaction must be rolled back afterwards
  290. unpacker = msgpack.Unpacker(use_list=False)
  291. cache.begin_txn()
  292. stats = Statistics()
  293. add(self.id)
  294. for id, chunk in zip(self.metadata[b'items'], self.repository.get_many(self.metadata[b'items'])):
  295. add(id)
  296. unpacker.feed(self.key.decrypt(id, chunk))
  297. for item in unpacker:
  298. if b'chunks' in item:
  299. stats.nfiles += 1
  300. add_file_chunks(item[b'chunks'])
  301. cache.rollback()
  302. return stats
  303. def extract_item(self, item, restore_attrs=True, dry_run=False, stdout=False, sparse=False):
  304. has_damaged_chunks = b'chunks_healthy' in item
  305. if dry_run or stdout:
  306. if b'chunks' in item:
  307. for data in self.pipeline.fetch_many([c[0] for c in item[b'chunks']], is_preloaded=True):
  308. if stdout:
  309. sys.stdout.buffer.write(data)
  310. if stdout:
  311. sys.stdout.buffer.flush()
  312. if has_damaged_chunks:
  313. logger.warning('File %s has damaged (all-zero) chunks. Try running borg check --repair.' %
  314. remove_surrogates(item[b'path']))
  315. return
  316. dest = self.cwd
  317. if item[b'path'].startswith('/') or item[b'path'].startswith('..'):
  318. raise Exception('Path should be relative and local')
  319. path = os.path.join(dest, item[b'path'])
  320. # Attempt to remove existing files, ignore errors on failure
  321. try:
  322. st = os.stat(path, follow_symlinks=False)
  323. if stat.S_ISDIR(st.st_mode):
  324. os.rmdir(path)
  325. else:
  326. os.unlink(path)
  327. except UnicodeEncodeError:
  328. raise self.IncompatibleFilesystemEncodingError(path, sys.getfilesystemencoding()) from None
  329. except OSError:
  330. pass
  331. def make_parent(path):
  332. parent_dir = os.path.dirname(path)
  333. if not os.path.exists(parent_dir):
  334. os.makedirs(parent_dir)
  335. mode = item[b'mode']
  336. if stat.S_ISREG(mode):
  337. with backup_io():
  338. make_parent(path)
  339. # Hard link?
  340. if b'source' in item:
  341. source = os.path.join(dest, item[b'source'])
  342. with backup_io():
  343. os.link(source, path)
  344. else:
  345. if sparse and self.zeros is None:
  346. self.zeros = b'\0' * (1 << self.chunker_params[1])
  347. with backup_io():
  348. fd = open(path, 'wb')
  349. with fd:
  350. ids = [c[0] for c in item[b'chunks']]
  351. for data in self.pipeline.fetch_many(ids, is_preloaded=True):
  352. with backup_io():
  353. if sparse and self.zeros.startswith(data):
  354. # all-zero chunk: create a hole in a sparse file
  355. fd.seek(len(data), 1)
  356. else:
  357. fd.write(data)
  358. with backup_io():
  359. pos = fd.tell()
  360. fd.truncate(pos)
  361. fd.flush()
  362. self.restore_attrs(path, item, fd=fd.fileno())
  363. if has_damaged_chunks:
  364. logger.warning('File %s has damaged (all-zero) chunks. Try running borg check --repair.' %
  365. remove_surrogates(item[b'path']))
  366. return
  367. with backup_io():
  368. # No repository access beyond this point.
  369. if stat.S_ISDIR(mode):
  370. make_parent(path)
  371. if not os.path.exists(path):
  372. os.mkdir(path)
  373. if restore_attrs:
  374. self.restore_attrs(path, item)
  375. elif stat.S_ISLNK(mode):
  376. make_parent(path)
  377. source = item[b'source']
  378. try:
  379. os.symlink(source, path)
  380. except UnicodeEncodeError:
  381. raise self.IncompatibleFilesystemEncodingError(source, sys.getfilesystemencoding()) from None
  382. self.restore_attrs(path, item, symlink=True)
  383. elif stat.S_ISFIFO(mode):
  384. make_parent(path)
  385. os.mkfifo(path)
  386. self.restore_attrs(path, item)
  387. elif stat.S_ISCHR(mode) or stat.S_ISBLK(mode):
  388. make_parent(path)
  389. os.mknod(path, item[b'mode'], item[b'rdev'])
  390. self.restore_attrs(path, item)
  391. else:
  392. raise Exception('Unknown archive item type %r' % item[b'mode'])
  393. def restore_attrs(self, path, item, symlink=False, fd=None):
  394. """
  395. Restore filesystem attributes on *path* (*fd*) from *item*.
  396. Does not access the repository.
  397. """
  398. uid = gid = None
  399. if not self.numeric_owner:
  400. uid = user2uid(item[b'user'])
  401. gid = group2gid(item[b'group'])
  402. uid = item[b'uid'] if uid is None else uid
  403. gid = item[b'gid'] if gid is None else gid
  404. # This code is a bit of a mess due to os specific differences
  405. try:
  406. if fd:
  407. os.fchown(fd, uid, gid)
  408. else:
  409. os.chown(path, uid, gid, follow_symlinks=False)
  410. except OSError:
  411. pass
  412. if fd:
  413. os.fchmod(fd, item[b'mode'])
  414. elif not symlink:
  415. os.chmod(path, item[b'mode'])
  416. elif has_lchmod: # Not available on Linux
  417. os.lchmod(path, item[b'mode'])
  418. mtime = bigint_to_int(item[b'mtime'])
  419. if b'atime' in item:
  420. atime = bigint_to_int(item[b'atime'])
  421. else:
  422. # old archives only had mtime in item metadata
  423. atime = mtime
  424. if fd:
  425. os.utime(fd, None, ns=(atime, mtime))
  426. else:
  427. os.utime(path, None, ns=(atime, mtime), follow_symlinks=False)
  428. acl_set(path, item, self.numeric_owner)
  429. # Only available on OS X and FreeBSD
  430. if has_lchflags and b'bsdflags' in item:
  431. try:
  432. os.lchflags(path, item[b'bsdflags'])
  433. except OSError:
  434. pass
  435. # chown removes Linux capabilities, so set the extended attributes at the end, after chown, since they include
  436. # the Linux capabilities in the "security.capability" attribute.
  437. xattrs = item.get(b'xattrs', {})
  438. for k, v in xattrs.items():
  439. try:
  440. xattr.setxattr(fd or path, k, v, follow_symlinks=False)
  441. except OSError as e:
  442. if e.errno == errno.E2BIG:
  443. # xattr is too big
  444. logger.warning('%s: Value or key of extended attribute %s is too big for this filesystem' %
  445. (path, k.decode()))
  446. set_ec(EXIT_WARNING)
  447. elif e.errno == errno.ENOTSUP:
  448. # xattrs not supported here
  449. logger.warning('%s: Extended attributes are not supported on this filesystem' % path)
  450. set_ec(EXIT_WARNING)
  451. elif e.errno == errno.EACCES:
  452. # permission denied to set this specific xattr (this may happen related to security.* keys)
  453. logger.warning('%s: Permission denied when setting extended attribute %s' % (path, k.decode()))
  454. set_ec(EXIT_WARNING)
  455. else:
  456. raise
  457. def rename(self, name):
  458. if name in self.manifest.archives:
  459. raise self.AlreadyExists(name)
  460. metadata = StableDict(self._load_meta(self.id))
  461. metadata[b'name'] = name
  462. data = msgpack.packb(metadata, unicode_errors='surrogateescape')
  463. new_id = self.key.id_hash(data)
  464. self.cache.add_chunk(new_id, data, self.stats)
  465. self.manifest.archives[name] = {'id': new_id, 'time': metadata[b'time']}
  466. self.cache.chunk_decref(self.id, self.stats)
  467. del self.manifest.archives[self.name]
  468. def delete(self, stats, progress=False, forced=False):
  469. class ChunksIndexError(Error):
  470. """Chunk ID {} missing from chunks index, corrupted chunks index - aborting transaction."""
  471. def chunk_decref(id, stats):
  472. nonlocal error
  473. try:
  474. self.cache.chunk_decref(id, stats)
  475. except KeyError:
  476. cid = bin_to_hex(id)
  477. raise ChunksIndexError(cid)
  478. except Repository.ObjectNotFound as e:
  479. # object not in repo - strange, but we wanted to delete it anyway.
  480. if forced == 0:
  481. raise
  482. error = True
  483. error = False
  484. try:
  485. unpacker = msgpack.Unpacker(use_list=False)
  486. items_ids = self.metadata[b'items']
  487. pi = ProgressIndicatorPercent(total=len(items_ids), msg="Decrementing references %3.0f%%", same_line=True)
  488. for (i, (items_id, data)) in enumerate(zip(items_ids, self.repository.get_many(items_ids))):
  489. if progress:
  490. pi.show(i)
  491. unpacker.feed(self.key.decrypt(items_id, data))
  492. chunk_decref(items_id, stats)
  493. try:
  494. for item in unpacker:
  495. if b'chunks' in item:
  496. for chunk_id, size, csize in item[b'chunks']:
  497. chunk_decref(chunk_id, stats)
  498. except (TypeError, ValueError):
  499. # if items metadata spans multiple chunks and one chunk got dropped somehow,
  500. # it could be that unpacker yields bad types
  501. if forced == 0:
  502. raise
  503. error = True
  504. if progress:
  505. pi.finish()
  506. except (msgpack.UnpackException, Repository.ObjectNotFound):
  507. # items metadata corrupted
  508. if forced == 0:
  509. raise
  510. error = True
  511. # in forced delete mode, we try hard to delete at least the manifest entry,
  512. # if possible also the archive superblock, even if processing the items raises
  513. # some harmless exception.
  514. chunk_decref(self.id, stats)
  515. del self.manifest.archives[self.name]
  516. if error:
  517. logger.warning('forced deletion succeeded, but the deleted archive was corrupted.')
  518. logger.warning('borg check --repair is required to free all space.')
  519. def stat_attrs(self, st, path):
  520. item = {
  521. b'mode': st.st_mode,
  522. b'uid': st.st_uid, b'user': uid2user(st.st_uid),
  523. b'gid': st.st_gid, b'group': gid2group(st.st_gid),
  524. b'mtime': int_to_bigint(safe_ns(st.st_mtime_ns)),
  525. }
  526. # borg can work with archives only having mtime (older attic archives do not have
  527. # atime/ctime). it can be useful to omit atime/ctime, if they change without the
  528. # file content changing - e.g. to get better metadata deduplication.
  529. if not self.noatime:
  530. item[b'atime'] = int_to_bigint(safe_ns(st.st_atime_ns))
  531. if not self.noctime:
  532. item[b'ctime'] = int_to_bigint(safe_ns(st.st_ctime_ns))
  533. if self.numeric_owner:
  534. item[b'user'] = item[b'group'] = None
  535. with backup_io():
  536. xattrs = xattr.get_all(path, follow_symlinks=False)
  537. if xattrs:
  538. item[b'xattrs'] = StableDict(xattrs)
  539. if has_lchflags and st.st_flags:
  540. item[b'bsdflags'] = st.st_flags
  541. with backup_io():
  542. acl_get(path, item, st, self.numeric_owner)
  543. return item
  544. def process_dir(self, path, st):
  545. item = {b'path': make_path_safe(path)}
  546. item.update(self.stat_attrs(st, path))
  547. self.add_item(item)
  548. return 'd' # directory
  549. def process_fifo(self, path, st):
  550. item = {b'path': make_path_safe(path)}
  551. item.update(self.stat_attrs(st, path))
  552. self.add_item(item)
  553. return 'f' # fifo
  554. def process_dev(self, path, st):
  555. item = {b'path': make_path_safe(path), b'rdev': st.st_rdev}
  556. item.update(self.stat_attrs(st, path))
  557. self.add_item(item)
  558. if stat.S_ISCHR(st.st_mode):
  559. return 'c' # char device
  560. elif stat.S_ISBLK(st.st_mode):
  561. return 'b' # block device
  562. def process_symlink(self, path, st):
  563. with backup_io():
  564. source = os.readlink(path)
  565. item = {b'path': make_path_safe(path), b'source': source}
  566. item.update(self.stat_attrs(st, path))
  567. self.add_item(item)
  568. return 's' # symlink
  569. def process_stdin(self, path, cache):
  570. uid, gid = 0, 0
  571. fd = sys.stdin.buffer # binary
  572. chunks = []
  573. for chunk in backup_io_iter(self.chunker.chunkify(fd)):
  574. chunks.append(cache.add_chunk(self.key.id_hash(chunk), chunk, self.stats))
  575. self.stats.nfiles += 1
  576. t = int_to_bigint(int(time.time()) * 1000000000)
  577. item = {
  578. b'path': path,
  579. b'chunks': chunks,
  580. b'mode': 0o100660, # regular file, ug=rw
  581. b'uid': uid, b'user': uid2user(uid),
  582. b'gid': gid, b'group': gid2group(gid),
  583. b'mtime': t, b'atime': t, b'ctime': t,
  584. }
  585. self.add_item(item)
  586. return 'i' # stdin
  587. def process_file(self, path, st, cache, ignore_inode=False):
  588. status = None
  589. safe_path = make_path_safe(path)
  590. # Is it a hard link?
  591. if st.st_nlink > 1:
  592. source = self.hard_links.get((st.st_ino, st.st_dev))
  593. if source is not None:
  594. item = self.stat_attrs(st, path)
  595. item.update({b'path': safe_path, b'source': source})
  596. self.add_item(item)
  597. status = 'h' # regular file, hardlink (to already seen inodes)
  598. return status
  599. is_special_file = is_special(st.st_mode)
  600. if not is_special_file:
  601. path_hash = self.key.id_hash(os.path.join(self.cwd, path).encode('utf-8', 'surrogateescape'))
  602. ids = cache.file_known_and_unchanged(path_hash, st, ignore_inode)
  603. else:
  604. # in --read-special mode, we may be called for special files.
  605. # there should be no information in the cache about special files processed in
  606. # read-special mode, but we better play safe as this was wrong in the past:
  607. path_hash = ids = None
  608. first_run = not cache.files
  609. if first_run:
  610. logger.debug('Processing files ...')
  611. chunks = None
  612. if ids is not None:
  613. # Make sure all ids are available
  614. for id_ in ids:
  615. if not cache.seen_chunk(id_):
  616. break
  617. else:
  618. chunks = [cache.chunk_incref(id_, self.stats) for id_ in ids]
  619. status = 'U' # regular file, unchanged
  620. else:
  621. status = 'A' # regular file, added
  622. item = {b'path': safe_path}
  623. # Only chunkify the file if needed
  624. if chunks is None:
  625. with backup_io():
  626. fh = Archive._open_rb(path)
  627. with os.fdopen(fh, 'rb') as fd:
  628. chunks = []
  629. for chunk in backup_io_iter(self.chunker.chunkify(fd, fh)):
  630. chunks.append(cache.add_chunk(self.key.id_hash(chunk), chunk, self.stats))
  631. if self.show_progress:
  632. self.stats.show_progress(item=item, dt=0.2)
  633. if not is_special_file:
  634. # we must not memorize special files, because the contents of e.g. a
  635. # block or char device will change without its mtime/size/inode changing.
  636. cache.memorize_file(path_hash, st, [c[0] for c in chunks])
  637. status = status or 'M' # regular file, modified (if not 'A' already)
  638. item[b'chunks'] = chunks
  639. item.update(self.stat_attrs(st, path))
  640. if is_special_file:
  641. # we processed a special file like a regular file. reflect that in mode,
  642. # so it can be extracted / accessed in FUSE mount like a regular file:
  643. item[b'mode'] = stat.S_IFREG | stat.S_IMODE(item[b'mode'])
  644. self.stats.nfiles += 1
  645. self.add_item(item)
  646. if st.st_nlink > 1 and source is None:
  647. # Add the hard link reference *after* the file has been added to the archive.
  648. self.hard_links[st.st_ino, st.st_dev] = safe_path
  649. return status
  650. @staticmethod
  651. def list_archives(repository, key, manifest, cache=None):
  652. # expensive! see also Manifest.list_archive_infos.
  653. for name, info in manifest.archives.items():
  654. yield Archive(repository, key, manifest, name, cache=cache)
  655. @staticmethod
  656. def _open_rb(path):
  657. try:
  658. # if we have O_NOATIME, this likely will succeed if we are root or owner of file:
  659. return os.open(path, flags_noatime)
  660. except PermissionError:
  661. if flags_noatime == flags_normal:
  662. # we do not have O_NOATIME, no need to try again:
  663. raise
  664. # Was this EPERM due to the O_NOATIME flag? Try again without it:
  665. return os.open(path, flags_normal)
  666. # this set must be kept complete, otherwise the RobustUnpacker might malfunction:
  667. ITEM_KEYS = frozenset([b'path', b'source', b'rdev', b'chunks', b'chunks_healthy',
  668. b'mode', b'user', b'group', b'uid', b'gid', b'mtime', b'atime', b'ctime',
  669. b'xattrs', b'bsdflags', b'acl_nfs4', b'acl_access', b'acl_default', b'acl_extended', ])
  670. # this is the set of keys that are always present in items:
  671. REQUIRED_ITEM_KEYS = frozenset([b'path', b'mtime', ])
  672. # this set must be kept complete, otherwise rebuild_manifest might malfunction:
  673. ARCHIVE_KEYS = frozenset([b'version', b'name', b'items', b'cmdline', b'hostname', b'username', b'time', b'time_end', ])
  674. # this is the set of keys that are always present in archives:
  675. REQUIRED_ARCHIVE_KEYS = frozenset([b'version', b'name', b'items', b'cmdline', b'time', ])
  676. def valid_msgpacked_dict(d, keys_serialized):
  677. """check if the data <d> looks like a msgpacked dict"""
  678. d_len = len(d)
  679. if d_len == 0:
  680. return False
  681. if d[0] & 0xf0 == 0x80: # object is a fixmap (up to 15 elements)
  682. offs = 1
  683. elif d[0] == 0xde: # object is a map16 (up to 2^16-1 elements)
  684. offs = 3
  685. else:
  686. # object is not a map (dict)
  687. # note: we must not have dicts with > 2^16-1 elements
  688. return False
  689. if d_len <= offs:
  690. return False
  691. # is the first dict key a bytestring?
  692. if d[offs] & 0xe0 == 0xa0: # key is a small bytestring (up to 31 chars)
  693. pass
  694. elif d[offs] in (0xd9, 0xda, 0xdb): # key is a str8, str16 or str32
  695. pass
  696. else:
  697. # key is not a bytestring
  698. return False
  699. # is the bytestring any of the expected key names?
  700. key_serialized = d[offs:]
  701. return any(key_serialized.startswith(pattern) for pattern in keys_serialized)
  702. class RobustUnpacker:
  703. """A restartable/robust version of the streaming msgpack unpacker
  704. """
  705. class UnpackerCrashed(Exception):
  706. """raise if unpacker crashed"""
  707. def __init__(self, validator, item_keys):
  708. super().__init__()
  709. self.item_keys = [msgpack.packb(name) for name in item_keys]
  710. self.validator = validator
  711. self._buffered_data = []
  712. self._resync = False
  713. self._unpacker = msgpack.Unpacker(object_hook=StableDict)
  714. def resync(self):
  715. self._buffered_data = []
  716. self._resync = True
  717. def feed(self, data):
  718. if self._resync:
  719. self._buffered_data.append(data)
  720. else:
  721. self._unpacker.feed(data)
  722. def __iter__(self):
  723. return self
  724. def __next__(self):
  725. def unpack_next():
  726. try:
  727. return next(self._unpacker)
  728. except (TypeError, ValueError) as err:
  729. # transform exceptions that might be raised when feeding
  730. # msgpack with invalid data to a more specific exception
  731. raise self.UnpackerCrashed(str(err))
  732. if self._resync:
  733. data = b''.join(self._buffered_data)
  734. while self._resync:
  735. if not data:
  736. raise StopIteration
  737. # Abort early if the data does not look like a serialized item dict
  738. if not valid_msgpacked_dict(data, self.item_keys):
  739. data = data[1:]
  740. continue
  741. self._unpacker = msgpack.Unpacker(object_hook=StableDict)
  742. self._unpacker.feed(data)
  743. try:
  744. item = unpack_next()
  745. except (self.UnpackerCrashed, StopIteration):
  746. # as long as we are resyncing, we also ignore StopIteration
  747. pass
  748. else:
  749. if self.validator(item):
  750. self._resync = False
  751. return item
  752. data = data[1:]
  753. else:
  754. return unpack_next()
  755. class ArchiveChecker:
  756. def __init__(self):
  757. self.error_found = False
  758. self.possibly_superseded = set()
  759. def check(self, repository, repair=False, archive=None, last=None, prefix=None, save_space=False):
  760. logger.info('Starting archive consistency check...')
  761. self.check_all = archive is None and last is None and prefix is None
  762. self.repair = repair
  763. self.repository = repository
  764. self.init_chunks()
  765. if not self.chunks:
  766. logger.error('Repository contains no apparent data at all, cannot continue check/repair.')
  767. return False
  768. self.key = self.identify_key(repository)
  769. if Manifest.MANIFEST_ID not in self.chunks:
  770. logger.error("Repository manifest not found!")
  771. self.error_found = True
  772. self.manifest = self.rebuild_manifest()
  773. else:
  774. try:
  775. self.manifest, _ = Manifest.load(repository, (Manifest.Operation.CHECK,), key=self.key)
  776. except IntegrityError as exc:
  777. logger.error('Repository manifest is corrupted: %s', exc)
  778. self.error_found = True
  779. del self.chunks[Manifest.MANIFEST_ID]
  780. self.manifest = self.rebuild_manifest()
  781. self.rebuild_refcounts(archive=archive, last=last, prefix=prefix)
  782. self.orphan_chunks_check()
  783. self.finish(save_space=save_space)
  784. if self.error_found:
  785. logger.error('Archive consistency check complete, problems found.')
  786. else:
  787. logger.info('Archive consistency check complete, no problems found.')
  788. return self.repair or not self.error_found
  789. def init_chunks(self):
  790. """Fetch a list of all object keys from repository
  791. """
  792. # Explicitly set the initial hash table capacity to avoid performance issues
  793. # due to hash table "resonance".
  794. # Since reconstruction of archive items can add some new chunks, add 10 % headroom
  795. capacity = int(len(self.repository) / ChunkIndex.MAX_LOAD_FACTOR * 1.1)
  796. self.chunks = ChunkIndex(capacity)
  797. marker = None
  798. while True:
  799. result = self.repository.list(limit=LIST_SCAN_LIMIT, marker=marker)
  800. if not result:
  801. break
  802. marker = result[-1]
  803. for id_ in result:
  804. self.chunks[id_] = (0, 0, 0)
  805. def identify_key(self, repository):
  806. try:
  807. some_chunkid, _ = next(self.chunks.iteritems())
  808. except StopIteration:
  809. # repo is completely empty, no chunks
  810. return None
  811. cdata = repository.get(some_chunkid)
  812. return key_factory(repository, cdata)
  813. def rebuild_manifest(self):
  814. """Rebuild the manifest object if it is missing
  815. Iterates through all objects in the repository looking for archive metadata blocks.
  816. """
  817. def valid_archive(obj):
  818. if not isinstance(obj, dict):
  819. return False
  820. keys = set(obj)
  821. return REQUIRED_ARCHIVE_KEYS.issubset(keys)
  822. logger.info('Rebuilding missing manifest, this might take some time...')
  823. # as we have lost the manifest, we do not know any more what valid item keys we had.
  824. # collecting any key we encounter in a damaged repo seems unwise, thus we just use
  825. # the hardcoded list from the source code. thus, it is not recommended to rebuild a
  826. # lost manifest on a older borg version than the most recent one that was ever used
  827. # within this repository (assuming that newer borg versions support more item keys).
  828. manifest = Manifest(self.key, self.repository)
  829. archive_keys_serialized = [msgpack.packb(name) for name in ARCHIVE_KEYS]
  830. for chunk_id, _ in self.chunks.iteritems():
  831. cdata = self.repository.get(chunk_id)
  832. try:
  833. data = self.key.decrypt(chunk_id, cdata)
  834. except IntegrityError as exc:
  835. logger.error('Skipping corrupted chunk: %s', exc)
  836. self.error_found = True
  837. continue
  838. if not valid_msgpacked_dict(data, archive_keys_serialized):
  839. continue
  840. if b'cmdline' not in data or b'\xa7version\x01' not in data:
  841. continue
  842. try:
  843. archive = msgpack.unpackb(data)
  844. # Ignore exceptions that might be raised when feeding
  845. # msgpack with invalid data
  846. except (TypeError, ValueError, StopIteration):
  847. continue
  848. if valid_archive(archive):
  849. name = archive[b'name'].decode()
  850. logger.info('Found archive %s', name)
  851. if name in manifest.archives:
  852. i = 1
  853. while True:
  854. new_name = '%s.%d' % (name, i)
  855. if new_name not in manifest.archives:
  856. break
  857. i += 1
  858. logger.warning('Duplicate archive name %s, storing as %s', name, new_name)
  859. name = new_name
  860. manifest.archives[name] = {b'id': chunk_id, b'time': archive[b'time']}
  861. logger.info('Manifest rebuild complete.')
  862. return manifest
  863. def rebuild_refcounts(self, archive=None, last=None, prefix=None):
  864. """Rebuild object reference counts by walking the metadata
  865. Missing and/or incorrect data is repaired when detected
  866. """
  867. # Exclude the manifest from chunks
  868. del self.chunks[Manifest.MANIFEST_ID]
  869. def mark_as_possibly_superseded(id_):
  870. if self.chunks.get(id_, (0,))[0] == 0:
  871. self.possibly_superseded.add(id_)
  872. def add_callback(chunk):
  873. id_ = self.key.id_hash(chunk)
  874. cdata = self.key.encrypt(chunk)
  875. add_reference(id_, len(chunk), len(cdata), cdata)
  876. return id_
  877. def add_reference(id_, size, csize, cdata=None):
  878. try:
  879. self.chunks.incref(id_)
  880. except KeyError:
  881. assert cdata is not None
  882. self.chunks[id_] = 1, size, csize
  883. if self.repair:
  884. self.repository.put(id_, cdata)
  885. def verify_file_chunks(item):
  886. """Verifies that all file chunks are present.
  887. Missing file chunks will be replaced with new chunks of the same length containing all zeros.
  888. If a previously missing file chunk re-appears, the replacement chunk is replaced by the correct one.
  889. """
  890. def replacement_chunk(size):
  891. data = bytes(size)
  892. chunk_id = self.key.id_hash(data)
  893. cdata = self.key.encrypt(data)
  894. csize = len(cdata)
  895. return chunk_id, size, csize, cdata
  896. offset = 0
  897. chunk_list = []
  898. chunks_replaced = False
  899. has_chunks_healthy = b'chunks_healthy' in item
  900. chunks_current = item[b'chunks']
  901. chunks_healthy = item[b'chunks_healthy'] if has_chunks_healthy else chunks_current
  902. assert len(chunks_current) == len(chunks_healthy)
  903. for chunk_current, chunk_healthy in zip(chunks_current, chunks_healthy):
  904. chunk_id, size, csize = chunk_healthy
  905. if chunk_id not in self.chunks:
  906. # a chunk of the healthy list is missing
  907. if chunk_current == chunk_healthy:
  908. logger.error('{}: New missing file chunk detected (Byte {}-{}). '
  909. 'Replacing with all-zero chunk.'.format(
  910. item[b'path'].decode('utf-8', 'surrogateescape'), offset, offset + size))
  911. self.error_found = chunks_replaced = True
  912. chunk_id, size, csize, cdata = replacement_chunk(size)
  913. add_reference(chunk_id, size, csize, cdata)
  914. else:
  915. logger.info('{}: Previously missing file chunk is still missing (Byte {}-{}). '
  916. 'It has a all-zero replacement chunk already.'.format(
  917. item[b'path'].decode('utf-8', 'surrogateescape'), offset, offset + size))
  918. chunk_id, size, csize = chunk_current
  919. if chunk_id in self.chunks:
  920. add_reference(chunk_id, size, csize)
  921. else:
  922. logger.warning('{}: Missing all-zero replacement chunk detected (Byte {}-{}). '
  923. 'Generating new replacement chunk.'.format(item.path, offset, offset + size))
  924. self.error_found = chunks_replaced = True
  925. chunk_id, size, csize, cdata = replacement_chunk(size)
  926. add_reference(chunk_id, size, csize, cdata)
  927. else:
  928. if chunk_current == chunk_healthy:
  929. # normal case, all fine.
  930. add_reference(chunk_id, size, csize)
  931. else:
  932. logger.info('{}: Healed previously missing file chunk! (Byte {}-{}).'.format(
  933. item[b'path'].decode('utf-8', 'surrogateescape'), offset, offset + size))
  934. add_reference(chunk_id, size, csize)
  935. mark_as_possibly_superseded(chunk_current[0]) # maybe orphaned the all-zero replacement chunk
  936. chunk_list.append([chunk_id, size, csize]) # list-typed element as chunks_healthy is list-of-lists
  937. offset += size
  938. if chunks_replaced and not has_chunks_healthy:
  939. # if this is first repair, remember the correct chunk IDs, so we can maybe heal the file later
  940. item[b'chunks_healthy'] = item[b'chunks']
  941. if has_chunks_healthy and chunk_list == chunks_healthy:
  942. logger.info('{}: Completely healed previously damaged file!'.format(
  943. item[b'path'].decode('utf-8', 'surrogateescape')))
  944. del item[b'chunks_healthy']
  945. item[b'chunks'] = chunk_list
  946. def robust_iterator(archive):
  947. """Iterates through all archive items
  948. Missing item chunks will be skipped and the msgpack stream will be restarted
  949. """
  950. item_keys = self.manifest.item_keys
  951. unpacker = RobustUnpacker(lambda item: isinstance(item, dict) and b'path' in item, item_keys)
  952. _state = 0
  953. def missing_chunk_detector(chunk_id):
  954. nonlocal _state
  955. if _state % 2 != int(chunk_id not in self.chunks):
  956. _state += 1
  957. return _state
  958. def report(msg, chunk_id, chunk_no):
  959. cid = bin_to_hex(chunk_id)
  960. msg += ' [chunk: %06d_%s]' % (chunk_no, cid) # see debug-dump-archive-items
  961. self.error_found = True
  962. logger.error(msg)
  963. def list_keys_safe(keys):
  964. return ', '.join((k.decode() if isinstance(k, bytes) else str(k) for k in keys))
  965. def valid_item(obj):
  966. if not isinstance(obj, StableDict):
  967. return False, 'not a dictionary'
  968. # A bug in Attic up to and including release 0.13 added a (meaningless) b'acl' key to every item.
  969. # We ignore it here, should it exist. See test_attic013_acl_bug for details.
  970. obj.pop(b'acl', None)
  971. keys = set(obj)
  972. if not REQUIRED_ITEM_KEYS.issubset(keys):
  973. return False, 'missing required keys: ' + list_keys_safe(REQUIRED_ITEM_KEYS - keys)
  974. if not keys.issubset(item_keys):
  975. return False, 'invalid keys: ' + list_keys_safe(keys - item_keys)
  976. return True, ''
  977. i = 0
  978. for state, items in groupby(archive[b'items'], missing_chunk_detector):
  979. items = list(items)
  980. if state % 2:
  981. for chunk_id in items:
  982. report('item metadata chunk missing', chunk_id, i)
  983. i += 1
  984. continue
  985. if state > 0:
  986. unpacker.resync()
  987. for chunk_id, cdata in zip(items, repository.get_many(items)):
  988. unpacker.feed(self.key.decrypt(chunk_id, cdata))
  989. try:
  990. for item in unpacker:
  991. valid, reason = valid_item(item)
  992. if valid:
  993. yield item
  994. else:
  995. report('Did not get expected metadata dict when unpacking item metadata (%s)' % reason, chunk_id, i)
  996. except RobustUnpacker.UnpackerCrashed as err:
  997. report('Unpacker crashed while unpacking item metadata, trying to resync...', chunk_id, i)
  998. unpacker.resync()
  999. except Exception:
  1000. report('Exception while unpacking item metadata', chunk_id, i)
  1001. raise
  1002. i += 1
  1003. if archive is None:
  1004. # we need last N or all archives
  1005. archive_items = sorted(self.manifest.archives.items(), reverse=True,
  1006. key=lambda name_info: name_info[1][b'time'])
  1007. if prefix is not None:
  1008. archive_items = [item for item in archive_items if item[0].startswith(prefix)]
  1009. if not archive_items:
  1010. logger.warning('--prefix %s does not match any archives', prefix)
  1011. num_archives = len(archive_items)
  1012. end = None if last is None else min(num_archives, last)
  1013. if last is not None and end < last:
  1014. logger.warning('--last %d archives: only found %d archives', last, end)
  1015. else:
  1016. # we only want one specific archive
  1017. archive_items = [item for item in self.manifest.archives.items() if item[0] == archive]
  1018. num_archives = 1
  1019. end = 1
  1020. if not archive_items:
  1021. logger.error('Archive %s does not exist', archive)
  1022. self.error_found = True
  1023. return
  1024. with cache_if_remote(self.repository) as repository:
  1025. for i, (name, info) in enumerate(archive_items[:end]):
  1026. logger.info('Analyzing archive {} ({}/{})'.format(name, num_archives - i, num_archives))
  1027. archive_id = info[b'id']
  1028. if archive_id not in self.chunks:
  1029. logger.error('Archive metadata block is missing!')
  1030. self.error_found = True
  1031. del self.manifest.archives[name]
  1032. continue
  1033. mark_as_possibly_superseded(archive_id)
  1034. cdata = self.repository.get(archive_id)
  1035. data = self.key.decrypt(archive_id, cdata)
  1036. archive = StableDict(msgpack.unpackb(data))
  1037. if archive[b'version'] != 1:
  1038. raise Exception('Unknown archive metadata version')
  1039. decode_dict(archive, (b'name', b'hostname', b'username', b'time', b'time_end'))
  1040. archive[b'cmdline'] = [arg.decode('utf-8', 'surrogateescape') for arg in archive[b'cmdline']]
  1041. items_buffer = ChunkBuffer(self.key)
  1042. items_buffer.write_chunk = add_callback
  1043. for item in robust_iterator(archive):
  1044. if b'chunks' in item:
  1045. verify_file_chunks(item)
  1046. items_buffer.add(item)
  1047. items_buffer.flush(flush=True)
  1048. for previous_item_id in archive[b'items']:
  1049. mark_as_possibly_superseded(previous_item_id)
  1050. archive[b'items'] = items_buffer.chunks
  1051. data = msgpack.packb(archive, unicode_errors='surrogateescape')
  1052. new_archive_id = self.key.id_hash(data)
  1053. cdata = self.key.encrypt(data)
  1054. add_reference(new_archive_id, len(data), len(cdata), cdata)
  1055. info[b'id'] = new_archive_id
  1056. def orphan_chunks_check(self):
  1057. if self.check_all:
  1058. unused = set()
  1059. for id_, (count, size, csize) in self.chunks.iteritems():
  1060. if count == 0:
  1061. unused.add(id_)
  1062. orphaned = unused - self.possibly_superseded
  1063. if orphaned:
  1064. logger.error('{} orphaned objects found!'.format(len(orphaned)))
  1065. self.error_found = True
  1066. if self.repair:
  1067. for id_ in unused:
  1068. self.repository.delete(id_)
  1069. else:
  1070. logger.info('Orphaned objects check skipped (needs all archives checked).')
  1071. def finish(self, save_space=False):
  1072. if self.repair:
  1073. self.manifest.write()
  1074. self.repository.commit(save_space=save_space)