archive.py 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721
  1. from datetime import datetime, timedelta, timezone
  2. from getpass import getuser
  3. from itertools import groupby
  4. import errno
  5. import shutil
  6. import tempfile
  7. from attic.key import key_factory
  8. from attic.remote import cache_if_remote
  9. import msgpack
  10. import os
  11. import socket
  12. import stat
  13. import sys
  14. import time
  15. from io import BytesIO
  16. from attic import xattr
  17. from attic.platform import acl_get, acl_set
  18. from attic.chunker import Chunker
  19. from attic.hashindex import ChunkIndex
  20. from attic.helpers import Error, uid2user, user2uid, gid2group, group2gid, \
  21. Manifest, Statistics, decode_dict, st_mtime_ns, make_path_safe, StableDict, int_to_bigint, bigint_to_int
  22. ITEMS_BUFFER = 1024 * 1024
  23. CHUNK_MIN = 1024
  24. WINDOW_SIZE = 0xfff
  25. CHUNK_MASK = 0xffff
  26. utime_supports_fd = os.utime in getattr(os, 'supports_fd', {})
  27. utime_supports_follow_symlinks = os.utime in getattr(os, 'supports_follow_symlinks', {})
  28. has_mtime_ns = sys.version >= '3.3'
  29. has_lchmod = hasattr(os, 'lchmod')
  30. has_lchflags = hasattr(os, 'lchflags')
  31. # Python <= 3.2 raises OSError instead of PermissionError (See #164)
  32. try:
  33. PermissionError = PermissionError
  34. except NameError:
  35. PermissionError = OSError
  36. class DownloadPipeline:
  37. def __init__(self, repository, key):
  38. self.repository = repository
  39. self.key = key
  40. def unpack_many(self, ids, filter=None, preload=False):
  41. unpacker = msgpack.Unpacker(use_list=False)
  42. for data in self.fetch_many(ids):
  43. unpacker.feed(data)
  44. items = [decode_dict(item, (b'path', b'source', b'user', b'group')) for item in unpacker]
  45. if filter:
  46. items = [item for item in items if filter(item)]
  47. if preload:
  48. for item in items:
  49. if b'chunks' in item:
  50. self.repository.preload([c[0] for c in item[b'chunks']])
  51. for item in items:
  52. yield item
  53. def fetch_many(self, ids, is_preloaded=False):
  54. for id_, data in zip(ids, self.repository.get_many(ids, is_preloaded=is_preloaded)):
  55. yield self.key.decrypt(id_, data)
  56. class ChunkBuffer:
  57. BUFFER_SIZE = 1 * 1024 * 1024
  58. def __init__(self, key):
  59. self.buffer = BytesIO()
  60. self.packer = msgpack.Packer(unicode_errors='surrogateescape')
  61. self.chunks = []
  62. self.key = key
  63. self.chunker = Chunker(WINDOW_SIZE, CHUNK_MASK, CHUNK_MIN, self.key.chunk_seed)
  64. def add(self, item):
  65. self.buffer.write(self.packer.pack(StableDict(item)))
  66. if self.is_full():
  67. self.flush()
  68. def write_chunk(self, chunk):
  69. raise NotImplementedError
  70. def flush(self, flush=False):
  71. if self.buffer.tell() == 0:
  72. return
  73. self.buffer.seek(0)
  74. chunks = list(bytes(s) for s in self.chunker.chunkify(self.buffer))
  75. self.buffer.seek(0)
  76. self.buffer.truncate(0)
  77. # Leave the last partial chunk in the buffer unless flush is True
  78. end = None if flush or len(chunks) == 1 else -1
  79. for chunk in chunks[:end]:
  80. self.chunks.append(self.write_chunk(chunk))
  81. if end == -1:
  82. self.buffer.write(chunks[-1])
  83. def is_full(self):
  84. return self.buffer.tell() > self.BUFFER_SIZE
  85. class CacheChunkBuffer(ChunkBuffer):
  86. def __init__(self, cache, key, stats):
  87. super(CacheChunkBuffer, self).__init__(key)
  88. self.cache = cache
  89. self.stats = stats
  90. def write_chunk(self, chunk):
  91. id_, _, _ = self.cache.add_chunk(self.key.id_hash(chunk), chunk, self.stats)
  92. return id_
  93. class Archive:
  94. class DoesNotExist(Error):
  95. """Archive {} does not exist"""
  96. class AlreadyExists(Error):
  97. """Archive {} already exists"""
  98. def __init__(self, repository, key, manifest, name, cache=None, create=False,
  99. checkpoint_interval=300, numeric_owner=False):
  100. self.cwd = os.getcwd()
  101. self.key = key
  102. self.repository = repository
  103. self.cache = cache
  104. self.manifest = manifest
  105. self.hard_links = {}
  106. self.stats = Statistics()
  107. self.name = name
  108. self.checkpoint_interval = checkpoint_interval
  109. self.numeric_owner = numeric_owner
  110. self.items_buffer = CacheChunkBuffer(self.cache, self.key, self.stats)
  111. self.pipeline = DownloadPipeline(self.repository, self.key)
  112. self.chunker = Chunker(WINDOW_SIZE, CHUNK_MASK, CHUNK_MIN, self.key.chunk_seed)
  113. if create:
  114. if name in manifest.archives:
  115. raise self.AlreadyExists(name)
  116. self.last_checkpoint = time.time()
  117. i = 0
  118. while True:
  119. self.checkpoint_name = '%s.checkpoint%s' % (name, i and ('.%d' % i) or '')
  120. if self.checkpoint_name not in manifest.archives:
  121. break
  122. i += 1
  123. else:
  124. if name not in self.manifest.archives:
  125. raise self.DoesNotExist(name)
  126. info = self.manifest.archives[name]
  127. self.load(info[b'id'])
  128. def load(self, id):
  129. self.id = id
  130. data = self.key.decrypt(self.id, self.repository.get(self.id))
  131. self.metadata = msgpack.unpackb(data)
  132. if self.metadata[b'version'] != 1:
  133. raise Exception('Unknown archive metadata version')
  134. decode_dict(self.metadata, (b'name', b'hostname', b'username', b'time'))
  135. self.metadata[b'cmdline'] = [arg.decode('utf-8', 'surrogateescape') for arg in self.metadata[b'cmdline']]
  136. self.name = self.metadata[b'name']
  137. @property
  138. def ts(self):
  139. """Timestamp of archive creation in UTC"""
  140. t, f = self.metadata[b'time'].split('.', 1)
  141. return datetime.strptime(t, '%Y-%m-%dT%H:%M:%S').replace(tzinfo=timezone.utc) + timedelta(seconds=float('.' + f))
  142. def __repr__(self):
  143. return 'Archive(%r)' % self.name
  144. def iter_items(self, filter=None, preload=False):
  145. for item in self.pipeline.unpack_many(self.metadata[b'items'], filter=filter, preload=preload):
  146. yield item
  147. def add_item(self, item):
  148. self.items_buffer.add(item)
  149. if time.time() - self.last_checkpoint > self.checkpoint_interval:
  150. self.write_checkpoint()
  151. self.last_checkpoint = time.time()
  152. def write_checkpoint(self):
  153. self.save(self.checkpoint_name)
  154. del self.manifest.archives[self.checkpoint_name]
  155. self.cache.chunk_decref(self.id, self.stats)
  156. def save(self, name=None):
  157. name = name or self.name
  158. if name in self.manifest.archives:
  159. raise self.AlreadyExists(name)
  160. self.items_buffer.flush(flush=True)
  161. metadata = StableDict({
  162. 'version': 1,
  163. 'name': name,
  164. 'items': self.items_buffer.chunks,
  165. 'cmdline': sys.argv,
  166. 'hostname': socket.gethostname(),
  167. 'username': getuser(),
  168. 'time': datetime.utcnow().isoformat(),
  169. })
  170. data = msgpack.packb(metadata, unicode_errors='surrogateescape')
  171. self.id = self.key.id_hash(data)
  172. self.cache.add_chunk(self.id, data, self.stats)
  173. self.manifest.archives[name] = {'id': self.id, 'time': metadata['time']}
  174. self.manifest.write()
  175. self.repository.commit()
  176. self.cache.commit()
  177. def calc_stats(self, cache):
  178. def add(id):
  179. count, size, csize = self.cache.chunks[id]
  180. stats.update(size, csize, count == 1)
  181. self.cache.chunks[id] = count - 1, size, csize
  182. def add_file_chunks(chunks):
  183. for id, _, _ in chunks:
  184. add(id)
  185. # This function is a bit evil since it abuses the cache to calculate
  186. # the stats. The cache transaction must be rolled back afterwards
  187. unpacker = msgpack.Unpacker(use_list=False)
  188. cache.begin_txn()
  189. stats = Statistics()
  190. add(self.id)
  191. for id, chunk in zip(self.metadata[b'items'], self.repository.get_many(self.metadata[b'items'])):
  192. add(id)
  193. unpacker.feed(self.key.decrypt(id, chunk))
  194. for item in unpacker:
  195. if b'chunks' in item:
  196. stats.nfiles += 1
  197. add_file_chunks(item[b'chunks'])
  198. cache.rollback()
  199. return stats
  200. def extract_item(self, item, restore_attrs=True, dry_run=False):
  201. if dry_run:
  202. if b'chunks' in item:
  203. for _ in self.pipeline.fetch_many([c[0] for c in item[b'chunks']], is_preloaded=True):
  204. pass
  205. return
  206. dest = self.cwd
  207. if item[b'path'].startswith('/') or item[b'path'].startswith('..'):
  208. raise Exception('Path should be relative and local')
  209. path = os.path.join(dest, item[b'path'])
  210. # Attempt to remove existing files, ignore errors on failure
  211. try:
  212. st = os.lstat(path)
  213. if stat.S_ISDIR(st.st_mode):
  214. os.rmdir(path)
  215. else:
  216. os.unlink(path)
  217. except OSError:
  218. pass
  219. mode = item[b'mode']
  220. if stat.S_ISDIR(mode):
  221. if not os.path.exists(path):
  222. os.makedirs(path)
  223. if restore_attrs:
  224. self.restore_attrs(path, item)
  225. elif stat.S_ISREG(mode):
  226. if not os.path.exists(os.path.dirname(path)):
  227. os.makedirs(os.path.dirname(path))
  228. # Hard link?
  229. if b'source' in item:
  230. source = os.path.join(dest, item[b'source'])
  231. if os.path.exists(path):
  232. os.unlink(path)
  233. os.link(source, path)
  234. else:
  235. with open(path, 'wb') as fd:
  236. ids = [c[0] for c in item[b'chunks']]
  237. for data in self.pipeline.fetch_many(ids, is_preloaded=True):
  238. fd.write(data)
  239. fd.flush()
  240. self.restore_attrs(path, item, fd=fd.fileno())
  241. elif stat.S_ISFIFO(mode):
  242. if not os.path.exists(os.path.dirname(path)):
  243. os.makedirs(os.path.dirname(path))
  244. os.mkfifo(path)
  245. self.restore_attrs(path, item)
  246. elif stat.S_ISLNK(mode):
  247. if not os.path.exists(os.path.dirname(path)):
  248. os.makedirs(os.path.dirname(path))
  249. source = item[b'source']
  250. if os.path.exists(path):
  251. os.unlink(path)
  252. os.symlink(source, path)
  253. self.restore_attrs(path, item, symlink=True)
  254. elif stat.S_ISCHR(mode) or stat.S_ISBLK(mode):
  255. os.mknod(path, item[b'mode'], item[b'rdev'])
  256. self.restore_attrs(path, item)
  257. else:
  258. raise Exception('Unknown archive item type %r' % item[b'mode'])
  259. def restore_attrs(self, path, item, symlink=False, fd=None):
  260. xattrs = item.get(b'xattrs')
  261. if xattrs:
  262. for k, v in xattrs.items():
  263. try:
  264. xattr.setxattr(fd or path, k, v, follow_symlinks=False)
  265. except OSError as e:
  266. if e.errno != errno.ENOTSUP:
  267. raise
  268. uid = gid = None
  269. if not self.numeric_owner:
  270. uid = user2uid(item[b'user'])
  271. gid = group2gid(item[b'group'])
  272. uid = item[b'uid'] if uid is None else uid
  273. gid = item[b'gid'] if gid is None else gid
  274. # This code is a bit of a mess due to os specific differences
  275. try:
  276. if fd:
  277. os.fchown(fd, uid, gid)
  278. else:
  279. os.lchown(path, uid, gid)
  280. except OSError:
  281. pass
  282. if fd:
  283. os.fchmod(fd, item[b'mode'])
  284. elif not symlink:
  285. os.chmod(path, item[b'mode'])
  286. elif has_lchmod: # Not available on Linux
  287. os.lchmod(path, item[b'mode'])
  288. mtime = bigint_to_int(item[b'mtime'])
  289. if fd and utime_supports_fd: # Python >= 3.3
  290. os.utime(fd, None, ns=(mtime, mtime))
  291. elif utime_supports_follow_symlinks: # Python >= 3.3
  292. os.utime(path, None, ns=(mtime, mtime), follow_symlinks=False)
  293. elif not symlink:
  294. os.utime(path, (mtime / 1e9, mtime / 1e9))
  295. acl_set(path, item, self.numeric_owner)
  296. # Only available on OS X and FreeBSD
  297. if has_lchflags and b'bsdflags' in item:
  298. try:
  299. os.lchflags(path, item[b'bsdflags'])
  300. except OSError:
  301. pass
  302. def delete(self, stats):
  303. unpacker = msgpack.Unpacker(use_list=False)
  304. for items_id, data in zip(self.metadata[b'items'], self.repository.get_many(self.metadata[b'items'])):
  305. unpacker.feed(self.key.decrypt(items_id, data))
  306. self.cache.chunk_decref(items_id, stats)
  307. for item in unpacker:
  308. if b'chunks' in item:
  309. for chunk_id, size, csize in item[b'chunks']:
  310. self.cache.chunk_decref(chunk_id, stats)
  311. self.cache.chunk_decref(self.id, stats)
  312. del self.manifest.archives[self.name]
  313. def stat_attrs(self, st, path):
  314. item = {
  315. b'mode': st.st_mode,
  316. b'uid': st.st_uid, b'user': uid2user(st.st_uid),
  317. b'gid': st.st_gid, b'group': gid2group(st.st_gid),
  318. b'mtime': int_to_bigint(st_mtime_ns(st))
  319. }
  320. if self.numeric_owner:
  321. item[b'user'] = item[b'group'] = None
  322. xattrs = xattr.get_all(path, follow_symlinks=False)
  323. if xattrs:
  324. item[b'xattrs'] = StableDict(xattrs)
  325. if has_lchflags and st.st_flags:
  326. item[b'bsdflags'] = st.st_flags
  327. acl_get(path, item, st, self.numeric_owner)
  328. return item
  329. def process_item(self, path, st):
  330. item = {b'path': make_path_safe(path)}
  331. item.update(self.stat_attrs(st, path))
  332. self.add_item(item)
  333. def process_dev(self, path, st):
  334. item = {b'path': make_path_safe(path), b'rdev': st.st_rdev}
  335. item.update(self.stat_attrs(st, path))
  336. self.add_item(item)
  337. def process_symlink(self, path, st):
  338. source = os.readlink(path)
  339. item = {b'path': make_path_safe(path), b'source': source}
  340. item.update(self.stat_attrs(st, path))
  341. self.add_item(item)
  342. def process_file(self, path, st, cache):
  343. safe_path = make_path_safe(path)
  344. # Is it a hard link?
  345. if st.st_nlink > 1:
  346. source = self.hard_links.get((st.st_ino, st.st_dev))
  347. if (st.st_ino, st.st_dev) in self.hard_links:
  348. item = self.stat_attrs(st, path)
  349. item.update({b'path': safe_path, b'source': source})
  350. self.add_item(item)
  351. return
  352. else:
  353. self.hard_links[st.st_ino, st.st_dev] = safe_path
  354. path_hash = self.key.id_hash(os.path.join(self.cwd, path).encode('utf-8', 'surrogateescape'))
  355. ids = cache.file_known_and_unchanged(path_hash, st)
  356. chunks = None
  357. if ids is not None:
  358. # Make sure all ids are available
  359. for id_ in ids:
  360. if not cache.seen_chunk(id_):
  361. break
  362. else:
  363. chunks = [cache.chunk_incref(id_, self.stats) for id_ in ids]
  364. # Only chunkify the file if needed
  365. if chunks is None:
  366. with Archive._open_rb(path, st) as fd:
  367. chunks = []
  368. for chunk in self.chunker.chunkify(fd):
  369. chunks.append(cache.add_chunk(self.key.id_hash(chunk), chunk, self.stats))
  370. cache.memorize_file(path_hash, st, [c[0] for c in chunks])
  371. item = {b'path': safe_path, b'chunks': chunks}
  372. item.update(self.stat_attrs(st, path))
  373. self.stats.nfiles += 1
  374. self.add_item(item)
  375. @staticmethod
  376. def list_archives(repository, key, manifest, cache=None):
  377. for name, info in manifest.archives.items():
  378. yield Archive(repository, key, manifest, name, cache=cache)
  379. @staticmethod
  380. def _open_rb(path, st):
  381. flags_noatime = None
  382. euid = None
  383. def open_simple(p, s):
  384. return open(p, 'rb')
  385. def open_noatime_if_owner(p, s):
  386. if s.st_uid == euid:
  387. return os.fdopen(os.open(p, flags_noatime), 'rb')
  388. else:
  389. return open(p, 'rb')
  390. def open_noatime(p, s):
  391. try:
  392. fd = os.open(p, flags_noatime)
  393. except PermissionError:
  394. # Was this EPERM due to the O_NOATIME flag?
  395. fo = open(p, 'rb')
  396. # Yes, it was -- otherwise the above line would have thrown
  397. # another exception.
  398. euid = os.geteuid()
  399. # So in future, let's check whether the file is owned by us
  400. # before attempting to use O_NOATIME.
  401. Archive._open_rb = open_noatime_if_owner
  402. return fo
  403. return os.fdopen(fd, 'rb')
  404. o_noatime = getattr(os, 'O_NOATIME', None)
  405. if o_noatime is not None:
  406. flags_noatime = os.O_RDONLY | getattr(os, 'O_BINARY', 0) | o_noatime
  407. # Always use O_NOATIME version.
  408. Archive._open_rb = open_noatime
  409. else:
  410. # Always use non-O_NOATIME version.
  411. Archive._open_rb = open_simple
  412. return Archive._open_rb(path, st)
  413. class RobustUnpacker():
  414. """A restartable/robust version of the streaming msgpack unpacker
  415. """
  416. item_keys = [msgpack.packb(name) for name in ('path', 'mode', 'source', 'chunks', 'rdev', 'xattrs', 'user', 'group', 'uid', 'gid', 'mtime')]
  417. def __init__(self, validator):
  418. super(RobustUnpacker, self).__init__()
  419. self.validator = validator
  420. self._buffered_data = []
  421. self._resync = False
  422. self._unpacker = msgpack.Unpacker(object_hook=StableDict)
  423. def resync(self):
  424. self._buffered_data = []
  425. self._resync = True
  426. def feed(self, data):
  427. if self._resync:
  428. self._buffered_data.append(data)
  429. else:
  430. self._unpacker.feed(data)
  431. def __iter__(self):
  432. return self
  433. def __next__(self):
  434. if self._resync:
  435. data = b''.join(self._buffered_data)
  436. while self._resync:
  437. if not data:
  438. raise StopIteration
  439. # Abort early if the data does not look like a serialized dict
  440. if len(data) < 2 or ((data[0] & 0xf0) != 0x80) or ((data[1] & 0xe0) != 0xa0):
  441. data = data[1:]
  442. continue
  443. # Make sure it looks like an item dict
  444. for pattern in self.item_keys:
  445. if data[1:].startswith(pattern):
  446. break
  447. else:
  448. data = data[1:]
  449. continue
  450. self._unpacker = msgpack.Unpacker(object_hook=StableDict)
  451. self._unpacker.feed(data)
  452. try:
  453. item = next(self._unpacker)
  454. if self.validator(item):
  455. self._resync = False
  456. return item
  457. # Ignore exceptions that might be raised when feeding
  458. # msgpack with invalid data
  459. except (TypeError, ValueError, StopIteration):
  460. pass
  461. data = data[1:]
  462. else:
  463. return next(self._unpacker)
  464. class ArchiveChecker:
  465. def __init__(self):
  466. self.error_found = False
  467. self.possibly_superseded = set()
  468. self.tmpdir = tempfile.mkdtemp()
  469. def __del__(self):
  470. shutil.rmtree(self.tmpdir)
  471. def check(self, repository, repair=False):
  472. self.report_progress('Starting archive consistency check...')
  473. self.repair = repair
  474. self.repository = repository
  475. self.init_chunks()
  476. self.key = self.identify_key(repository)
  477. if Manifest.MANIFEST_ID not in self.chunks:
  478. self.manifest = self.rebuild_manifest()
  479. else:
  480. self.manifest, _ = Manifest.load(repository, key=self.key)
  481. self.rebuild_refcounts()
  482. self.verify_chunks()
  483. if not self.error_found:
  484. self.report_progress('Archive consistency check complete, no problems found.')
  485. return self.repair or not self.error_found
  486. def init_chunks(self):
  487. """Fetch a list of all object keys from repository
  488. """
  489. # Explicity set the initial hash table capacity to avoid performance issues
  490. # due to hash table "resonance"
  491. capacity = int(len(self.repository) * 1.2)
  492. self.chunks = ChunkIndex(capacity)
  493. marker = None
  494. while True:
  495. result = self.repository.list(limit=10000, marker=marker)
  496. if not result:
  497. break
  498. marker = result[-1]
  499. for id_ in result:
  500. self.chunks[id_] = (0, 0, 0)
  501. def report_progress(self, msg, error=False):
  502. if error:
  503. self.error_found = True
  504. print(msg, file=sys.stderr if error else sys.stdout)
  505. def identify_key(self, repository):
  506. cdata = repository.get(next(self.chunks.iteritems())[0])
  507. return key_factory(repository, cdata)
  508. def rebuild_manifest(self):
  509. """Rebuild the manifest object if it is missing
  510. Iterates through all objects in the repository looking for archive metadata blocks.
  511. """
  512. self.report_progress('Rebuilding missing manifest, this might take some time...', error=True)
  513. manifest = Manifest(self.key, self.repository)
  514. for chunk_id, _ in self.chunks.iteritems():
  515. cdata = self.repository.get(chunk_id)
  516. data = self.key.decrypt(chunk_id, cdata)
  517. # Some basic sanity checks of the payload before feeding it into msgpack
  518. if len(data) < 2 or ((data[0] & 0xf0) != 0x80) or ((data[1] & 0xe0) != 0xa0):
  519. continue
  520. if b'cmdline' not in data or b'\xa7version\x01' not in data:
  521. continue
  522. try:
  523. archive = msgpack.unpackb(data)
  524. except Exception:
  525. continue
  526. if isinstance(archive, dict) and b'items' in archive and b'cmdline' in archive:
  527. self.report_progress('Found archive ' + archive[b'name'].decode('utf-8'), error=True)
  528. manifest.archives[archive[b'name'].decode('utf-8')] = {b'id': chunk_id, b'time': archive[b'time']}
  529. self.report_progress('Manifest rebuild complete', error=True)
  530. return manifest
  531. def rebuild_refcounts(self):
  532. """Rebuild object reference counts by walking the metadata
  533. Missing and/or incorrect data is repaired when detected
  534. """
  535. # Exclude the manifest from chunks
  536. del self.chunks[Manifest.MANIFEST_ID]
  537. def mark_as_possibly_superseded(id_):
  538. if self.chunks.get(id_, (0,))[0] == 0:
  539. self.possibly_superseded.add(id_)
  540. def add_callback(chunk):
  541. id_ = self.key.id_hash(chunk)
  542. cdata = self.key.encrypt(chunk)
  543. add_reference(id_, len(chunk), len(cdata), cdata)
  544. return id_
  545. def add_reference(id_, size, csize, cdata=None):
  546. try:
  547. count, _, _ = self.chunks[id_]
  548. self.chunks[id_] = count + 1, size, csize
  549. except KeyError:
  550. assert cdata is not None
  551. self.chunks[id_] = 1, size, csize
  552. if self.repair:
  553. self.repository.put(id_, cdata)
  554. def verify_file_chunks(item):
  555. """Verifies that all file chunks are present
  556. Missing file chunks will be replaced with new chunks of the same
  557. length containing all zeros.
  558. """
  559. offset = 0
  560. chunk_list = []
  561. for chunk_id, size, csize in item[b'chunks']:
  562. if chunk_id not in self.chunks:
  563. # If a file chunk is missing, create an all empty replacement chunk
  564. self.report_progress('{}: Missing file chunk detected (Byte {}-{})'.format(item[b'path'].decode('utf-8', 'surrogateescape'), offset, offset + size), error=True)
  565. data = bytes(size)
  566. chunk_id = self.key.id_hash(data)
  567. cdata = self.key.encrypt(data)
  568. csize = len(cdata)
  569. add_reference(chunk_id, size, csize, cdata)
  570. else:
  571. add_reference(chunk_id, size, csize)
  572. chunk_list.append((chunk_id, size, csize))
  573. offset += size
  574. item[b'chunks'] = chunk_list
  575. def robust_iterator(archive):
  576. """Iterates through all archive items
  577. Missing item chunks will be skipped and the msgpack stream will be restarted
  578. """
  579. unpacker = RobustUnpacker(lambda item: isinstance(item, dict) and b'path' in item)
  580. _state = 0
  581. def missing_chunk_detector(chunk_id):
  582. nonlocal _state
  583. if _state % 2 != int(chunk_id not in self.chunks):
  584. _state += 1
  585. return _state
  586. for state, items in groupby(archive[b'items'], missing_chunk_detector):
  587. items = list(items)
  588. if state % 2:
  589. self.report_progress('Archive metadata damage detected', error=True)
  590. continue
  591. if state > 0:
  592. unpacker.resync()
  593. for chunk_id, cdata in zip(items, repository.get_many(items)):
  594. unpacker.feed(self.key.decrypt(chunk_id, cdata))
  595. for item in unpacker:
  596. yield item
  597. repository = cache_if_remote(self.repository)
  598. num_archives = len(self.manifest.archives)
  599. for i, (name, info) in enumerate(list(self.manifest.archives.items()), 1):
  600. self.report_progress('Analyzing archive {} ({}/{})'.format(name, i, num_archives))
  601. archive_id = info[b'id']
  602. if archive_id not in self.chunks:
  603. self.report_progress('Archive metadata block is missing', error=True)
  604. del self.manifest.archives[name]
  605. continue
  606. mark_as_possibly_superseded(archive_id)
  607. cdata = self.repository.get(archive_id)
  608. data = self.key.decrypt(archive_id, cdata)
  609. archive = StableDict(msgpack.unpackb(data))
  610. if archive[b'version'] != 1:
  611. raise Exception('Unknown archive metadata version')
  612. decode_dict(archive, (b'name', b'hostname', b'username', b'time')) # fixme: argv
  613. items_buffer = ChunkBuffer(self.key)
  614. items_buffer.write_chunk = add_callback
  615. for item in robust_iterator(archive):
  616. if b'chunks' in item:
  617. verify_file_chunks(item)
  618. items_buffer.add(item)
  619. items_buffer.flush(flush=True)
  620. for previous_item_id in archive[b'items']:
  621. mark_as_possibly_superseded(previous_item_id)
  622. archive[b'items'] = items_buffer.chunks
  623. data = msgpack.packb(archive, unicode_errors='surrogateescape')
  624. new_archive_id = self.key.id_hash(data)
  625. cdata = self.key.encrypt(data)
  626. add_reference(new_archive_id, len(data), len(cdata), cdata)
  627. info[b'id'] = new_archive_id
  628. def verify_chunks(self):
  629. unused = set()
  630. for id_, (count, size, csize) in self.chunks.iteritems():
  631. if count == 0:
  632. unused.add(id_)
  633. orphaned = unused - self.possibly_superseded
  634. if orphaned:
  635. self.report_progress('{} orphaned objects found'.format(len(orphaned)), error=True)
  636. if self.repair:
  637. for id_ in unused:
  638. self.repository.delete(id_)
  639. self.manifest.write()
  640. self.repository.commit()