archive.py 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657
  1. from datetime import datetime, timedelta, timezone
  2. from getpass import getuser
  3. from itertools import groupby
  4. import errno
  5. import shutil
  6. import tempfile
  7. from attic.key import key_factory
  8. from attic.remote import cache_if_remote
  9. import msgpack
  10. import os
  11. import socket
  12. import stat
  13. import sys
  14. import time
  15. from io import BytesIO
  16. from attic import xattr
  17. from attic.chunker import chunkify
  18. from attic.hashindex import ChunkIndex
  19. from attic.helpers import Error, uid2user, user2uid, gid2group, group2gid, \
  20. Manifest, Statistics, decode_dict, st_mtime_ns, make_path_safe, StableDict
  21. ITEMS_BUFFER = 1024 * 1024
  22. CHUNK_MIN = 1024
  23. WINDOW_SIZE = 0xfff
  24. CHUNK_MASK = 0xffff
  25. utime_supports_fd = os.utime in getattr(os, 'supports_fd', {})
  26. has_mtime_ns = sys.version >= '3.3'
  27. has_lchmod = hasattr(os, 'lchmod')
  28. class DownloadPipeline:
  29. def __init__(self, repository, key):
  30. self.repository = repository
  31. self.key = key
  32. def unpack_many(self, ids, filter=None, preload=False):
  33. unpacker = msgpack.Unpacker(use_list=False)
  34. for data in self.fetch_many(ids):
  35. unpacker.feed(data)
  36. items = [decode_dict(item, (b'path', b'source', b'user', b'group')) for item in unpacker]
  37. if filter:
  38. items = [item for item in items if filter(item)]
  39. if preload:
  40. for item in items:
  41. if b'chunks' in item:
  42. self.repository.preload([c[0] for c in item[b'chunks']])
  43. for item in items:
  44. yield item
  45. def fetch_many(self, ids, is_preloaded=False):
  46. for id_, data in zip(ids, self.repository.get_many(ids, is_preloaded=is_preloaded)):
  47. yield self.key.decrypt(id_, data)
  48. class ChunkBuffer:
  49. BUFFER_SIZE = 1 * 1024 * 1024
  50. def __init__(self, key):
  51. self.buffer = BytesIO()
  52. self.packer = msgpack.Packer(unicode_errors='surrogateescape')
  53. self.chunks = []
  54. self.key = key
  55. def add(self, item):
  56. self.buffer.write(self.packer.pack(StableDict(item)))
  57. if self.is_full():
  58. self.flush()
  59. def write_chunk(self, chunk):
  60. raise NotImplementedError
  61. def flush(self, flush=False):
  62. if self.buffer.tell() == 0:
  63. return
  64. self.buffer.seek(0)
  65. chunks = list(bytes(s) for s in chunkify(self.buffer, WINDOW_SIZE, CHUNK_MASK, CHUNK_MIN, self.key.chunk_seed))
  66. self.buffer.seek(0)
  67. self.buffer.truncate(0)
  68. # Leave the last parital chunk in the buffer unless flush is True
  69. end = None if flush or len(chunks) == 1 else -1
  70. for chunk in chunks[:end]:
  71. self.chunks.append(self.write_chunk(chunk))
  72. if end == -1:
  73. self.buffer.write(chunks[-1])
  74. def is_full(self):
  75. return self.buffer.tell() > self.BUFFER_SIZE
  76. class CacheChunkBuffer(ChunkBuffer):
  77. def __init__(self, cache, key, stats):
  78. super(CacheChunkBuffer, self).__init__(key)
  79. self.cache = cache
  80. self.stats = stats
  81. def write_chunk(self, chunk):
  82. id_, _, _ = self.cache.add_chunk(self.key.id_hash(chunk), chunk, self.stats)
  83. return id_
  84. class Archive:
  85. class DoesNotExist(Error):
  86. """Archive {} does not exist"""
  87. class AlreadyExists(Error):
  88. """Archive {} already exists"""
  89. def __init__(self, repository, key, manifest, name, cache=None, create=False,
  90. checkpoint_interval=300, numeric_owner=False):
  91. self.cwd = os.getcwd()
  92. self.key = key
  93. self.repository = repository
  94. self.cache = cache
  95. self.manifest = manifest
  96. self.hard_links = {}
  97. self.stats = Statistics()
  98. self.name = name
  99. self.checkpoint_interval = checkpoint_interval
  100. self.numeric_owner = numeric_owner
  101. self.items_buffer = CacheChunkBuffer(self.cache, self.key, self.stats)
  102. self.pipeline = DownloadPipeline(self.repository, self.key)
  103. if create:
  104. if name in manifest.archives:
  105. raise self.AlreadyExists(name)
  106. self.last_checkpoint = time.time()
  107. i = 0
  108. while True:
  109. self.checkpoint_name = '%s.checkpoint%s' % (name, i and ('.%d' % i) or '')
  110. if not self.checkpoint_name in manifest.archives:
  111. break
  112. i += 1
  113. else:
  114. if name not in self.manifest.archives:
  115. raise self.DoesNotExist(name)
  116. info = self.manifest.archives[name]
  117. self.load(info[b'id'])
  118. def load(self, id):
  119. self.id = id
  120. data = self.key.decrypt(self.id, self.repository.get(self.id))
  121. self.metadata = msgpack.unpackb(data)
  122. if self.metadata[b'version'] != 1:
  123. raise Exception('Unknown archive metadata version')
  124. decode_dict(self.metadata, (b'name', b'hostname', b'username', b'time'))
  125. self.metadata[b'cmdline'] = [arg.decode('utf-8', 'surrogateescape') for arg in self.metadata[b'cmdline']]
  126. self.name = self.metadata[b'name']
  127. @property
  128. def ts(self):
  129. """Timestamp of archive creation in UTC"""
  130. t, f = self.metadata[b'time'].split('.', 1)
  131. return datetime.strptime(t, '%Y-%m-%dT%H:%M:%S').replace(tzinfo=timezone.utc) + timedelta(seconds=float('.' + f))
  132. def __repr__(self):
  133. return 'Archive(%r)' % self.name
  134. def iter_items(self, filter=None, preload=False):
  135. for item in self.pipeline.unpack_many(self.metadata[b'items'], filter=filter, preload=preload):
  136. yield item
  137. def add_item(self, item):
  138. self.items_buffer.add(item)
  139. if time.time() - self.last_checkpoint > self.checkpoint_interval:
  140. self.write_checkpoint()
  141. self.last_checkpoint = time.time()
  142. def write_checkpoint(self):
  143. self.save(self.checkpoint_name)
  144. del self.manifest.archives[self.checkpoint_name]
  145. self.cache.chunk_decref(self.id, self.stats)
  146. def save(self, name=None):
  147. name = name or self.name
  148. if name in self.manifest.archives:
  149. raise self.AlreadyExists(name)
  150. self.items_buffer.flush(flush=True)
  151. metadata = StableDict({
  152. 'version': 1,
  153. 'name': name,
  154. 'items': self.items_buffer.chunks,
  155. 'cmdline': sys.argv,
  156. 'hostname': socket.gethostname(),
  157. 'username': getuser(),
  158. 'time': datetime.utcnow().isoformat(),
  159. })
  160. data = msgpack.packb(metadata, unicode_errors='surrogateescape')
  161. self.id = self.key.id_hash(data)
  162. self.cache.add_chunk(self.id, data, self.stats)
  163. self.manifest.archives[name] = {'id': self.id, 'time': metadata['time']}
  164. self.manifest.write()
  165. self.repository.commit()
  166. self.cache.commit()
  167. def calc_stats(self, cache):
  168. def add(id):
  169. count, size, csize = self.cache.chunks[id]
  170. stats.update(size, csize, count == 1)
  171. self.cache.chunks[id] = count - 1, size, csize
  172. def add_file_chunks(chunks):
  173. for id, _, _ in chunks:
  174. add(id)
  175. # This function is a bit evil since it abuses the cache to calculate
  176. # the stats. The cache transaction must be rolled back afterwards
  177. unpacker = msgpack.Unpacker(use_list=False)
  178. cache.begin_txn()
  179. stats = Statistics()
  180. add(self.id)
  181. for id, chunk in zip(self.metadata[b'items'], self.repository.get_many(self.metadata[b'items'])):
  182. add(id)
  183. unpacker.feed(self.key.decrypt(id, chunk))
  184. for item in unpacker:
  185. if b'chunks' in item:
  186. stats.nfiles += 1
  187. add_file_chunks(item[b'chunks'])
  188. cache.rollback()
  189. return stats
  190. def extract_item(self, item, restore_attrs=True, dry_run=False):
  191. if dry_run:
  192. if b'chunks' in item:
  193. for _ in self.pipeline.fetch_many([c[0] for c in item[b'chunks']], is_preloaded=True):
  194. pass
  195. return
  196. dest = self.cwd
  197. if item[b'path'].startswith('/') or item[b'path'].startswith('..'):
  198. raise Exception('Path should be relative and local')
  199. path = os.path.join(dest, item[b'path'])
  200. # Attempt to remove existing files, ignore errors on failure
  201. try:
  202. st = os.lstat(path)
  203. if stat.S_ISDIR(st.st_mode):
  204. os.rmdir(path)
  205. else:
  206. os.unlink(path)
  207. except OSError:
  208. pass
  209. mode = item[b'mode']
  210. if stat.S_ISDIR(mode):
  211. if not os.path.exists(path):
  212. os.makedirs(path)
  213. if restore_attrs:
  214. self.restore_attrs(path, item)
  215. elif stat.S_ISREG(mode):
  216. if not os.path.exists(os.path.dirname(path)):
  217. os.makedirs(os.path.dirname(path))
  218. # Hard link?
  219. if b'source' in item:
  220. source = os.path.join(dest, item[b'source'])
  221. if os.path.exists(path):
  222. os.unlink(path)
  223. os.link(source, path)
  224. else:
  225. with open(path, 'wb') as fd:
  226. ids = [c[0] for c in item[b'chunks']]
  227. for data in self.pipeline.fetch_many(ids, is_preloaded=True):
  228. fd.write(data)
  229. fd.flush()
  230. self.restore_attrs(path, item, fd=fd.fileno())
  231. elif stat.S_ISFIFO(mode):
  232. if not os.path.exists(os.path.dirname(path)):
  233. os.makedirs(os.path.dirname(path))
  234. os.mkfifo(path)
  235. self.restore_attrs(path, item)
  236. elif stat.S_ISLNK(mode):
  237. if not os.path.exists(os.path.dirname(path)):
  238. os.makedirs(os.path.dirname(path))
  239. source = item[b'source']
  240. if os.path.exists(path):
  241. os.unlink(path)
  242. os.symlink(source, path)
  243. self.restore_attrs(path, item, symlink=True)
  244. elif stat.S_ISCHR(mode) or stat.S_ISBLK(mode):
  245. os.mknod(path, item[b'mode'], item[b'rdev'])
  246. self.restore_attrs(path, item)
  247. else:
  248. raise Exception('Unknown archive item type %r' % item[b'mode'])
  249. def restore_attrs(self, path, item, symlink=False, fd=None):
  250. xattrs = item.get(b'xattrs')
  251. if xattrs:
  252. for k, v in xattrs.items():
  253. try:
  254. xattr.setxattr(fd or path, k, v)
  255. except OSError as e:
  256. if e.errno != errno.ENOTSUP:
  257. raise
  258. uid = gid = None
  259. if not self.numeric_owner:
  260. uid = user2uid(item[b'user'])
  261. gid = group2gid(item[b'group'])
  262. uid = uid or item[b'uid']
  263. gid = gid or item[b'gid']
  264. # This code is a bit of a mess due to os specific differences
  265. try:
  266. if fd:
  267. os.fchown(fd, uid, gid)
  268. else:
  269. os.lchown(path, uid, gid)
  270. except OSError:
  271. pass
  272. if fd:
  273. os.fchmod(fd, item[b'mode'])
  274. elif not symlink:
  275. os.chmod(path, item[b'mode'])
  276. elif has_lchmod: # Not available on Linux
  277. os.lchmod(path, item[b'mode'])
  278. if fd and utime_supports_fd: # Python >= 3.3
  279. os.utime(fd, None, ns=(item[b'mtime'], item[b'mtime']))
  280. elif utime_supports_fd: # Python >= 3.3
  281. os.utime(path, None, ns=(item[b'mtime'], item[b'mtime']), follow_symlinks=False)
  282. elif not symlink:
  283. os.utime(path, (item[b'mtime'] / 10**9, item[b'mtime'] / 10**9))
  284. def delete(self, stats):
  285. unpacker = msgpack.Unpacker(use_list=False)
  286. for items_id, data in zip(self.metadata[b'items'], self.repository.get_many(self.metadata[b'items'])):
  287. unpacker.feed(self.key.decrypt(items_id, data))
  288. self.cache.chunk_decref(items_id, stats)
  289. for item in unpacker:
  290. if b'chunks' in item:
  291. for chunk_id, size, csize in item[b'chunks']:
  292. self.cache.chunk_decref(chunk_id, stats)
  293. self.cache.chunk_decref(self.id, stats)
  294. del self.manifest.archives[self.name]
  295. def stat_attrs(self, st, path):
  296. item = {
  297. b'mode': st.st_mode,
  298. b'uid': st.st_uid, b'user': uid2user(st.st_uid),
  299. b'gid': st.st_gid, b'group': gid2group(st.st_gid),
  300. b'mtime': st_mtime_ns(st),
  301. }
  302. if self.numeric_owner:
  303. item[b'user'] = item[b'group'] = None
  304. xattrs = xattr.get_all(path, follow_symlinks=False)
  305. if xattrs:
  306. item[b'xattrs'] = StableDict(xattrs)
  307. return item
  308. def process_item(self, path, st):
  309. item = {b'path': make_path_safe(path)}
  310. item.update(self.stat_attrs(st, path))
  311. self.add_item(item)
  312. def process_dev(self, path, st):
  313. item = {b'path': make_path_safe(path), b'rdev': st.st_rdev}
  314. item.update(self.stat_attrs(st, path))
  315. self.add_item(item)
  316. def process_symlink(self, path, st):
  317. source = os.readlink(path)
  318. item = {b'path': make_path_safe(path), b'source': source}
  319. item.update(self.stat_attrs(st, path))
  320. self.add_item(item)
  321. def process_file(self, path, st, cache):
  322. safe_path = make_path_safe(path)
  323. # Is it a hard link?
  324. if st.st_nlink > 1:
  325. source = self.hard_links.get((st.st_ino, st.st_dev))
  326. if (st.st_ino, st.st_dev) in self.hard_links:
  327. item = self.stat_attrs(st, path)
  328. item.update({b'path': safe_path, b'source': source})
  329. self.add_item(item)
  330. return
  331. else:
  332. self.hard_links[st.st_ino, st.st_dev] = safe_path
  333. path_hash = self.key.id_hash(os.path.join(self.cwd, path).encode('utf-8', 'surrogateescape'))
  334. ids = cache.file_known_and_unchanged(path_hash, st)
  335. chunks = None
  336. if ids is not None:
  337. # Make sure all ids are available
  338. for id_ in ids:
  339. if not cache.seen_chunk(id_):
  340. break
  341. else:
  342. chunks = [cache.chunk_incref(id_, self.stats) for id_ in ids]
  343. # Only chunkify the file if needed
  344. if chunks is None:
  345. with open(path, 'rb') as fd:
  346. chunks = []
  347. for chunk in chunkify(fd, WINDOW_SIZE, CHUNK_MASK, CHUNK_MIN, self.key.chunk_seed):
  348. chunks.append(cache.add_chunk(self.key.id_hash(chunk), chunk, self.stats))
  349. cache.memorize_file(path_hash, st, [c[0] for c in chunks])
  350. item = {b'path': safe_path, b'chunks': chunks}
  351. item.update(self.stat_attrs(st, path))
  352. self.stats.nfiles += 1
  353. self.add_item(item)
  354. @staticmethod
  355. def list_archives(repository, key, manifest, cache=None):
  356. for name, info in manifest.archives.items():
  357. yield Archive(repository, key, manifest, name, cache=cache)
  358. class RobustUnpacker():
  359. """A restartable/robust version of the streaming msgpack unpacker
  360. """
  361. item_keys = [msgpack.packb(name) for name in ('path', 'mode', 'source', 'chunks', 'rdev', 'xattrs', 'user', 'group', 'uid', 'gid', 'mtime')]
  362. def __init__(self, validator):
  363. super(RobustUnpacker, self).__init__()
  364. self.validator = validator
  365. self._buffered_data = []
  366. self._resync = False
  367. self._unpacker = msgpack.Unpacker(object_hook=StableDict)
  368. def resync(self):
  369. self._buffered_data = []
  370. self._resync = True
  371. def feed(self, data):
  372. if self._resync:
  373. self._buffered_data.append(data)
  374. else:
  375. self._unpacker.feed(data)
  376. def __iter__(self):
  377. return self
  378. def __next__(self):
  379. if self._resync:
  380. data = b''.join(self._buffered_data)
  381. while self._resync:
  382. if not data:
  383. raise StopIteration
  384. # Abort early if the data does not look like a serialized dict
  385. if len(data) < 2 or ((data[0] & 0xf0) != 0x80) or ((data[1] & 0xe0) != 0xa0):
  386. data = data[1:]
  387. continue
  388. # Make sure it looks like an item dict
  389. for pattern in self.item_keys:
  390. if data[1:].startswith(pattern):
  391. break
  392. else:
  393. data = data[1:]
  394. continue
  395. self._unpacker = msgpack.Unpacker(object_hook=StableDict)
  396. self._unpacker.feed(data)
  397. try:
  398. item = next(self._unpacker)
  399. if self.validator(item):
  400. self._resync = False
  401. return item
  402. # Ignore exceptions that might be raised when feeding
  403. # msgpack with invalid data
  404. except (TypeError, ValueError, StopIteration):
  405. pass
  406. data = data[1:]
  407. else:
  408. return next(self._unpacker)
  409. class ArchiveChecker:
  410. def __init__(self):
  411. self.error_found = False
  412. self.possibly_superseded = set()
  413. self.tmpdir = tempfile.mkdtemp()
  414. def __del__(self):
  415. shutil.rmtree(self.tmpdir)
  416. def check(self, repository, repair=False):
  417. self.report_progress('Starting archive consistency check...')
  418. self.repair = repair
  419. self.repository = repository
  420. self.init_chunks()
  421. self.key = self.identify_key(repository)
  422. if not Manifest.MANIFEST_ID in self.chunks:
  423. self.manifest = self.rebuild_manifest()
  424. else:
  425. self.manifest, _ = Manifest.load(repository, key=self.key)
  426. self.rebuild_refcounts()
  427. self.verify_chunks()
  428. if not self.error_found:
  429. self.report_progress('Archive consistency check complete, no problems found.')
  430. return self.repair or not self.error_found
  431. def init_chunks(self):
  432. """Fetch a list of all object keys from repository
  433. """
  434. # Explicity set the initial hash table capacity to avoid performance issues
  435. # due to hash table "resonance"
  436. capacity = int(len(self.repository) * 1.2)
  437. self.chunks = ChunkIndex.create(os.path.join(self.tmpdir, 'chunks').encode('utf-8'), capacity=capacity)
  438. marker = None
  439. while True:
  440. result = self.repository.list(limit=10000, marker=marker)
  441. if not result:
  442. break
  443. marker = result[-1]
  444. for id_ in result:
  445. self.chunks[id_] = (0, 0, 0)
  446. def report_progress(self, msg, error=False):
  447. if error:
  448. self.error_found = True
  449. print(msg, file=sys.stderr if error else sys.stdout)
  450. def identify_key(self, repository):
  451. cdata = repository.get(next(self.chunks.iteritems())[0])
  452. return key_factory(repository, cdata)
  453. def rebuild_manifest(self):
  454. """Rebuild the manifest object if it is missing
  455. Iterates through all objects in the repository looking for archive metadata blocks.
  456. """
  457. self.report_progress('Rebuilding missing manifest, this might take some time...', error=True)
  458. manifest = Manifest(self.key, self.repository)
  459. for chunk_id, _ in self.chunks.iteritems():
  460. cdata = self.repository.get(chunk_id)
  461. data = self.key.decrypt(chunk_id, cdata)
  462. # Some basic sanity checks of the payload before feeding it into msgpack
  463. if len(data) < 2 or ((data[0] & 0xf0) != 0x80) or ((data[1] & 0xe0) != 0xa0):
  464. continue
  465. if not b'cmdline' in data or not b'\xa7version\x01' in data:
  466. continue
  467. try:
  468. archive = msgpack.unpackb(data)
  469. except:
  470. continue
  471. if isinstance(archive, dict) and b'items' in archive and b'cmdline' in archive:
  472. self.report_progress('Found archive ' + archive[b'name'].decode('utf-8'), error=True)
  473. manifest.archives[archive[b'name'].decode('utf-8')] = {b'id': chunk_id, b'time': archive[b'time']}
  474. self.report_progress('Manifest rebuild complete', error=True)
  475. return manifest
  476. def rebuild_refcounts(self):
  477. """Rebuild object reference counts by walking the metadata
  478. Missing and/or incorrect data is repaired when detected
  479. """
  480. # Exclude the manifest from chunks
  481. del self.chunks[Manifest.MANIFEST_ID]
  482. def mark_as_possibly_superseded(id_):
  483. if self.chunks.get(id_, (0,))[0] == 0:
  484. self.possibly_superseded.add(id_)
  485. def add_callback(chunk):
  486. id_ = self.key.id_hash(chunk)
  487. cdata = self.key.encrypt(chunk)
  488. add_reference(id_, len(chunk), len(cdata), cdata)
  489. return id_
  490. def add_reference(id_, size, csize, cdata=None):
  491. try:
  492. count, _, _ = self.chunks[id_]
  493. self.chunks[id_] = count + 1, size, csize
  494. except KeyError:
  495. assert cdata is not None
  496. self.chunks[id_] = 1, size, csize
  497. if self.repair:
  498. self.repository.put(id_, cdata)
  499. def verify_file_chunks(item):
  500. """Verifies that all file chunks are present
  501. Missing file chunks will be replaced with new chunks of the same
  502. length containing all zeros.
  503. """
  504. offset = 0
  505. chunk_list = []
  506. for chunk_id, size, csize in item[b'chunks']:
  507. if not chunk_id in self.chunks:
  508. # If a file chunk is missing, create an all empty replacement chunk
  509. self.report_progress('{}: Missing file chunk detected (Byte {}-{})'.format(item[b'path'].decode('utf-8', 'surrogateescape'), offset, offset + size), error=True)
  510. data = bytes(size)
  511. chunk_id = self.key.id_hash(data)
  512. cdata = self.key.encrypt(data)
  513. csize = len(cdata)
  514. add_reference(chunk_id, size, csize, cdata)
  515. else:
  516. add_reference(chunk_id, size, csize)
  517. chunk_list.append((chunk_id, size, csize))
  518. offset += size
  519. item[b'chunks'] = chunk_list
  520. def robust_iterator(archive):
  521. """Iterates through all archive items
  522. Missing item chunks will be skipped and the msgpack stream will be restarted
  523. """
  524. unpacker = RobustUnpacker(lambda item: isinstance(item, dict) and b'path' in item)
  525. _state = 0
  526. def missing_chunk_detector(chunk_id):
  527. nonlocal _state
  528. if _state % 2 != int(not chunk_id in self.chunks):
  529. _state += 1
  530. return _state
  531. for state, items in groupby(archive[b'items'], missing_chunk_detector):
  532. items = list(items)
  533. if state % 2:
  534. self.report_progress('Archive metadata damage detected', error=True)
  535. continue
  536. if state > 0:
  537. unpacker.resync()
  538. for chunk_id, cdata in zip(items, repository.get_many(items)):
  539. unpacker.feed(self.key.decrypt(chunk_id, cdata))
  540. for item in unpacker:
  541. yield item
  542. repository = cache_if_remote(self.repository)
  543. num_archives = len(self.manifest.archives)
  544. for i, (name, info) in enumerate(list(self.manifest.archives.items()), 1):
  545. self.report_progress('Analyzing archive {} ({}/{})'.format(name, i, num_archives))
  546. archive_id = info[b'id']
  547. if not archive_id in self.chunks:
  548. self.report_progress('Archive metadata block is missing', error=True)
  549. del self.manifest.archives[name]
  550. continue
  551. mark_as_possibly_superseded(archive_id)
  552. cdata = self.repository.get(archive_id)
  553. data = self.key.decrypt(archive_id, cdata)
  554. archive = StableDict(msgpack.unpackb(data))
  555. if archive[b'version'] != 1:
  556. raise Exception('Unknown archive metadata version')
  557. decode_dict(archive, (b'name', b'hostname', b'username', b'time')) # fixme: argv
  558. items_buffer = ChunkBuffer(self.key)
  559. items_buffer.write_chunk = add_callback
  560. for item in robust_iterator(archive):
  561. if b'chunks' in item:
  562. verify_file_chunks(item)
  563. items_buffer.add(item)
  564. items_buffer.flush(flush=True)
  565. for previous_item_id in archive[b'items']:
  566. mark_as_possibly_superseded(previous_item_id)
  567. archive[b'items'] = items_buffer.chunks
  568. data = msgpack.packb(archive, unicode_errors='surrogateescape')
  569. new_archive_id = self.key.id_hash(data)
  570. cdata = self.key.encrypt(data)
  571. add_reference(new_archive_id, len(data), len(cdata), cdata)
  572. info[b'id'] = new_archive_id
  573. def verify_chunks(self):
  574. unused = set()
  575. for id_, (count, size, csize) in self.chunks.iteritems():
  576. if count == 0:
  577. unused.add(id_)
  578. orphaned = unused - self.possibly_superseded
  579. if orphaned:
  580. self.report_progress('{} orphaned objects found'.format(len(orphaned)), error=True)
  581. if self.repair:
  582. for id_ in unused:
  583. self.repository.delete(id_)
  584. self.manifest.write()
  585. self.repository.commit()