2
0

archive.py 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810
  1. from datetime import datetime, timedelta, timezone
  2. from getpass import getuser
  3. from itertools import groupby
  4. import errno
  5. import shutil
  6. import tempfile
  7. from attic.key import key_factory
  8. from attic.remote import cache_if_remote
  9. import msgpack
  10. import os
  11. import socket
  12. import stat
  13. import sys
  14. import time
  15. from io import BytesIO
  16. from attic import xattr
  17. from attic.cache import Cache
  18. from attic.platform import acl_get, acl_set
  19. from attic.chunker import Chunker
  20. from attic.hashindex import ChunkIndex
  21. from attic.helpers import Error, uid2user, user2uid, gid2group, group2gid, \
  22. Manifest, Statistics, decode_dict, st_mtime_ns, make_path_safe, StableDict, int_to_bigint, bigint_to_int
  23. ITEMS_BUFFER = 1024 * 1024
  24. CHUNK_MIN = 1024
  25. CHUNK_MAX = 10 * 1024 * 1024
  26. WINDOW_SIZE = 0xfff
  27. CHUNK_MASK = 0xffff
  28. ZEROS = b'\0' * CHUNK_MAX
  29. utime_supports_fd = os.utime in getattr(os, 'supports_fd', {})
  30. utime_supports_follow_symlinks = os.utime in getattr(os, 'supports_follow_symlinks', {})
  31. has_mtime_ns = sys.version >= '3.3'
  32. has_lchmod = hasattr(os, 'lchmod')
  33. has_lchflags = hasattr(os, 'lchflags')
  34. # Python <= 3.2 raises OSError instead of PermissionError (See #164)
  35. try:
  36. PermissionError = PermissionError
  37. except NameError:
  38. PermissionError = OSError
  39. class DownloadPipeline:
  40. def __init__(self, repository, key):
  41. self.repository = repository
  42. self.key = key
  43. def unpack_many(self, ids, filter=None, preload=False):
  44. unpacker = msgpack.Unpacker(use_list=False)
  45. for data in self.fetch_many(ids):
  46. unpacker.feed(data)
  47. items = [decode_dict(item, (b'path', b'source', b'user', b'group')) for item in unpacker]
  48. if filter:
  49. items = [item for item in items if filter(item)]
  50. if preload:
  51. for item in items:
  52. if b'chunks' in item:
  53. self.repository.preload([c[0] for c in item[b'chunks']])
  54. for item in items:
  55. yield item
  56. def fetch_many(self, ids, is_preloaded=False):
  57. for id_, data in zip(ids, self.repository.get_many(ids, is_preloaded=is_preloaded)):
  58. yield self.key.decrypt(id_, data)
  59. class ChunkBuffer:
  60. BUFFER_SIZE = 1 * 1024 * 1024
  61. def __init__(self, key):
  62. self.buffer = BytesIO()
  63. self.packer = msgpack.Packer(unicode_errors='surrogateescape')
  64. self.chunks = []
  65. self.key = key
  66. self.chunker = Chunker(WINDOW_SIZE, CHUNK_MASK, CHUNK_MIN, CHUNK_MAX,self.key.chunk_seed)
  67. def add(self, item):
  68. self.buffer.write(self.packer.pack(StableDict(item)))
  69. if self.is_full():
  70. self.flush()
  71. def write_chunk(self, chunk):
  72. raise NotImplementedError
  73. def flush(self, flush=False):
  74. if self.buffer.tell() == 0:
  75. return
  76. self.buffer.seek(0)
  77. chunks = list(bytes(s) for s in self.chunker.chunkify(self.buffer))
  78. self.buffer.seek(0)
  79. self.buffer.truncate(0)
  80. # Leave the last partial chunk in the buffer unless flush is True
  81. end = None if flush or len(chunks) == 1 else -1
  82. for chunk in chunks[:end]:
  83. self.chunks.append(self.write_chunk(chunk))
  84. if end == -1:
  85. self.buffer.write(chunks[-1])
  86. def is_full(self):
  87. return self.buffer.tell() > self.BUFFER_SIZE
  88. class CacheChunkBuffer(ChunkBuffer):
  89. def __init__(self, cache, key, stats):
  90. super(CacheChunkBuffer, self).__init__(key)
  91. self.cache = cache
  92. self.stats = stats
  93. def write_chunk(self, chunk):
  94. id_, _, _ = self.cache.add_chunk(self.key.id_hash(chunk), chunk, self.stats)
  95. return id_
  96. class Archive:
  97. class DoesNotExist(Error):
  98. """Archive {} does not exist"""
  99. class AlreadyExists(Error):
  100. """Archive {} already exists"""
  101. def __init__(self, repository, key, manifest, name, cache=None, create=False,
  102. checkpoint_interval=300, numeric_owner=False, progress=False):
  103. self.cwd = os.getcwd()
  104. self.key = key
  105. self.repository = repository
  106. self.cache = cache
  107. self.manifest = manifest
  108. self.hard_links = {}
  109. self.stats = Statistics()
  110. self.show_progress = progress
  111. self.last_progress = time.time()
  112. self.name = name
  113. self.checkpoint_interval = checkpoint_interval
  114. self.numeric_owner = numeric_owner
  115. self.pipeline = DownloadPipeline(self.repository, self.key)
  116. if create:
  117. self.items_buffer = CacheChunkBuffer(self.cache, self.key, self.stats)
  118. self.chunker = Chunker(WINDOW_SIZE, CHUNK_MASK, CHUNK_MIN, CHUNK_MAX, self.key.chunk_seed)
  119. if name in manifest.archives:
  120. raise self.AlreadyExists(name)
  121. self.last_checkpoint = time.time()
  122. i = 0
  123. while True:
  124. self.checkpoint_name = '%s.checkpoint%s' % (name, i and ('.%d' % i) or '')
  125. if self.checkpoint_name not in manifest.archives:
  126. break
  127. i += 1
  128. else:
  129. if name not in self.manifest.archives:
  130. raise self.DoesNotExist(name)
  131. info = self.manifest.archives[name]
  132. self.load(info[b'id'])
  133. def _load_meta(self, id):
  134. data = self.key.decrypt(id, self.repository.get(id))
  135. metadata = msgpack.unpackb(data)
  136. if metadata[b'version'] != 1:
  137. raise Exception('Unknown archive metadata version')
  138. return metadata
  139. def load(self, id):
  140. self.id = id
  141. self.metadata = self._load_meta(self.id)
  142. decode_dict(self.metadata, (b'name', b'hostname', b'username', b'time'))
  143. self.metadata[b'cmdline'] = [arg.decode('utf-8', 'surrogateescape') for arg in self.metadata[b'cmdline']]
  144. self.name = self.metadata[b'name']
  145. @property
  146. def ts(self):
  147. """Timestamp of archive creation in UTC"""
  148. t = self.metadata[b'time'].split('.', 1)
  149. dt = datetime.strptime(t[0], '%Y-%m-%dT%H:%M:%S').replace(tzinfo=timezone.utc)
  150. if len(t) > 1:
  151. dt += timedelta(seconds=float('.' + t[1]))
  152. return dt
  153. def __repr__(self):
  154. return 'Archive(%r)' % self.name
  155. def iter_items(self, filter=None, preload=False):
  156. for item in self.pipeline.unpack_many(self.metadata[b'items'], filter=filter, preload=preload):
  157. yield item
  158. def add_item(self, item):
  159. if self.show_progress and time.time() - self.last_progress > 0.2:
  160. self.stats.show_progress(item=item)
  161. self.last_progress = time.time()
  162. self.items_buffer.add(item)
  163. if time.time() - self.last_checkpoint > self.checkpoint_interval:
  164. self.write_checkpoint()
  165. self.last_checkpoint = time.time()
  166. def write_checkpoint(self):
  167. self.save(self.checkpoint_name)
  168. del self.manifest.archives[self.checkpoint_name]
  169. self.cache.chunk_decref(self.id, self.stats)
  170. def save(self, name=None, timestamp=None):
  171. name = name or self.name
  172. if name in self.manifest.archives:
  173. raise self.AlreadyExists(name)
  174. self.items_buffer.flush(flush=True)
  175. if timestamp is None:
  176. timestamp = datetime.utcnow()
  177. metadata = StableDict({
  178. 'version': 1,
  179. 'name': name,
  180. 'items': self.items_buffer.chunks,
  181. 'cmdline': sys.argv,
  182. 'hostname': socket.gethostname(),
  183. 'username': getuser(),
  184. 'time': timestamp.isoformat(),
  185. })
  186. data = msgpack.packb(metadata, unicode_errors='surrogateescape')
  187. self.id = self.key.id_hash(data)
  188. self.cache.add_chunk(self.id, data, self.stats)
  189. self.manifest.archives[name] = {'id': self.id, 'time': metadata['time']}
  190. self.manifest.write()
  191. self.repository.commit()
  192. self.cache.commit()
  193. def calc_stats(self):
  194. def add(id):
  195. count, size, csize = cache.chunks[id]
  196. stats.update(size, csize, count == 1)
  197. cache.chunks[id] = count - 1, size, csize # dirties cache.chunks!
  198. def add_file_chunks(chunks):
  199. for id, _, _ in chunks:
  200. add(id)
  201. # This function is a bit evil since it abuses the cache to calculate
  202. # the stats. The cache transaction must be rolled back afterwards.
  203. cache = Cache(self.repository, self.key, self.manifest)
  204. cache.begin_txn()
  205. stats = Statistics()
  206. unpacker = msgpack.Unpacker(use_list=False)
  207. add(self.id)
  208. for id, chunk in zip(self.metadata[b'items'], self.repository.get_many(self.metadata[b'items'])):
  209. add(id)
  210. unpacker.feed(self.key.decrypt(id, chunk))
  211. for item in unpacker:
  212. if b'chunks' in item:
  213. stats.nfiles += 1
  214. add_file_chunks(item[b'chunks'])
  215. cache.rollback()
  216. return stats
  217. def extract_item(self, item, restore_attrs=True, dry_run=False, stdout=False, sparse=False):
  218. if dry_run or stdout:
  219. if b'chunks' in item:
  220. for data in self.pipeline.fetch_many([c[0] for c in item[b'chunks']], is_preloaded=True):
  221. if stdout:
  222. sys.stdout.buffer.write(data)
  223. if stdout:
  224. sys.stdout.buffer.flush()
  225. return
  226. dest = self.cwd
  227. if item[b'path'].startswith('/') or item[b'path'].startswith('..'):
  228. raise Exception('Path should be relative and local')
  229. path = os.path.join(dest, item[b'path'])
  230. # Attempt to remove existing files, ignore errors on failure
  231. try:
  232. st = os.lstat(path)
  233. if stat.S_ISDIR(st.st_mode):
  234. os.rmdir(path)
  235. else:
  236. os.unlink(path)
  237. except OSError:
  238. pass
  239. mode = item[b'mode']
  240. if stat.S_ISDIR(mode):
  241. if not os.path.exists(path):
  242. os.makedirs(path)
  243. if restore_attrs:
  244. self.restore_attrs(path, item)
  245. elif stat.S_ISREG(mode):
  246. if not os.path.exists(os.path.dirname(path)):
  247. os.makedirs(os.path.dirname(path))
  248. # Hard link?
  249. if b'source' in item:
  250. source = os.path.join(dest, item[b'source'])
  251. if os.path.exists(path):
  252. os.unlink(path)
  253. os.link(source, path)
  254. else:
  255. with open(path, 'wb') as fd:
  256. ids = [c[0] for c in item[b'chunks']]
  257. for data in self.pipeline.fetch_many(ids, is_preloaded=True):
  258. if sparse and ZEROS.startswith(data):
  259. # all-zero chunk: create a hole in a sparse file
  260. fd.seek(len(data), 1)
  261. else:
  262. fd.write(data)
  263. pos = fd.tell()
  264. fd.truncate(pos)
  265. fd.flush()
  266. self.restore_attrs(path, item, fd=fd.fileno())
  267. elif stat.S_ISFIFO(mode):
  268. if not os.path.exists(os.path.dirname(path)):
  269. os.makedirs(os.path.dirname(path))
  270. os.mkfifo(path)
  271. self.restore_attrs(path, item)
  272. elif stat.S_ISLNK(mode):
  273. if not os.path.exists(os.path.dirname(path)):
  274. os.makedirs(os.path.dirname(path))
  275. source = item[b'source']
  276. if os.path.exists(path):
  277. os.unlink(path)
  278. os.symlink(source, path)
  279. self.restore_attrs(path, item, symlink=True)
  280. elif stat.S_ISCHR(mode) or stat.S_ISBLK(mode):
  281. os.mknod(path, item[b'mode'], item[b'rdev'])
  282. self.restore_attrs(path, item)
  283. else:
  284. raise Exception('Unknown archive item type %r' % item[b'mode'])
  285. def restore_attrs(self, path, item, symlink=False, fd=None):
  286. xattrs = item.get(b'xattrs')
  287. if xattrs:
  288. for k, v in xattrs.items():
  289. try:
  290. xattr.setxattr(fd or path, k, v, follow_symlinks=False)
  291. except OSError as e:
  292. if e.errno != errno.ENOTSUP:
  293. raise
  294. uid = gid = None
  295. if not self.numeric_owner:
  296. uid = user2uid(item[b'user'])
  297. gid = group2gid(item[b'group'])
  298. uid = item[b'uid'] if uid is None else uid
  299. gid = item[b'gid'] if gid is None else gid
  300. # This code is a bit of a mess due to os specific differences
  301. try:
  302. if fd:
  303. os.fchown(fd, uid, gid)
  304. else:
  305. os.lchown(path, uid, gid)
  306. except OSError:
  307. pass
  308. if fd:
  309. os.fchmod(fd, item[b'mode'])
  310. elif not symlink:
  311. os.chmod(path, item[b'mode'])
  312. elif has_lchmod: # Not available on Linux
  313. os.lchmod(path, item[b'mode'])
  314. mtime = bigint_to_int(item[b'mtime'])
  315. if fd and utime_supports_fd: # Python >= 3.3
  316. os.utime(fd, None, ns=(mtime, mtime))
  317. elif utime_supports_follow_symlinks: # Python >= 3.3
  318. os.utime(path, None, ns=(mtime, mtime), follow_symlinks=False)
  319. elif not symlink:
  320. os.utime(path, (mtime / 1e9, mtime / 1e9))
  321. acl_set(path, item, self.numeric_owner)
  322. # Only available on OS X and FreeBSD
  323. if has_lchflags and b'bsdflags' in item:
  324. try:
  325. os.lchflags(path, item[b'bsdflags'])
  326. except OSError:
  327. pass
  328. def rename(self, name):
  329. if name in self.manifest.archives:
  330. raise self.AlreadyExists(name)
  331. metadata = StableDict(self._load_meta(self.id))
  332. metadata[b'name'] = name
  333. data = msgpack.packb(metadata, unicode_errors='surrogateescape')
  334. new_id = self.key.id_hash(data)
  335. self.cache.add_chunk(new_id, data, self.stats)
  336. self.manifest.archives[name] = {'id': new_id, 'time': metadata[b'time']}
  337. self.cache.chunk_decref(self.id, self.stats)
  338. del self.manifest.archives[self.name]
  339. def delete(self, stats):
  340. unpacker = msgpack.Unpacker(use_list=False)
  341. for items_id, data in zip(self.metadata[b'items'], self.repository.get_many(self.metadata[b'items'])):
  342. unpacker.feed(self.key.decrypt(items_id, data))
  343. self.cache.chunk_decref(items_id, stats)
  344. for item in unpacker:
  345. if b'chunks' in item:
  346. for chunk_id, size, csize in item[b'chunks']:
  347. self.cache.chunk_decref(chunk_id, stats)
  348. self.cache.chunk_decref(self.id, stats)
  349. del self.manifest.archives[self.name]
  350. def stat_attrs(self, st, path):
  351. item = {
  352. b'mode': st.st_mode,
  353. b'uid': st.st_uid, b'user': uid2user(st.st_uid),
  354. b'gid': st.st_gid, b'group': gid2group(st.st_gid),
  355. b'mtime': int_to_bigint(st_mtime_ns(st))
  356. }
  357. if self.numeric_owner:
  358. item[b'user'] = item[b'group'] = None
  359. xattrs = xattr.get_all(path, follow_symlinks=False)
  360. if xattrs:
  361. item[b'xattrs'] = StableDict(xattrs)
  362. if has_lchflags and st.st_flags:
  363. item[b'bsdflags'] = st.st_flags
  364. acl_get(path, item, st, self.numeric_owner)
  365. return item
  366. def process_dir(self, path, st):
  367. item = {b'path': make_path_safe(path)}
  368. item.update(self.stat_attrs(st, path))
  369. self.add_item(item)
  370. return 'd' # directory
  371. def process_fifo(self, path, st):
  372. item = {b'path': make_path_safe(path)}
  373. item.update(self.stat_attrs(st, path))
  374. self.add_item(item)
  375. return 'f' # fifo
  376. def process_dev(self, path, st):
  377. item = {b'path': make_path_safe(path), b'rdev': st.st_rdev}
  378. item.update(self.stat_attrs(st, path))
  379. self.add_item(item)
  380. if stat.S_ISCHR(st.st_mode):
  381. return 'c' # char device
  382. elif stat.S_ISBLK(st.st_mode):
  383. return 'b' # block device
  384. def process_symlink(self, path, st):
  385. source = os.readlink(path)
  386. item = {b'path': make_path_safe(path), b'source': source}
  387. item.update(self.stat_attrs(st, path))
  388. self.add_item(item)
  389. return 's' # symlink
  390. def process_stdin(self, path, cache):
  391. uid, gid = 0, 0
  392. fd = sys.stdin.buffer # binary
  393. chunks = []
  394. for chunk in self.chunker.chunkify(fd):
  395. chunks.append(cache.add_chunk(self.key.id_hash(chunk), chunk, self.stats))
  396. self.stats.nfiles += 1
  397. item = {
  398. b'path': path,
  399. b'chunks': chunks,
  400. b'mode': 0o100660, # regular file, ug=rw
  401. b'uid': uid, b'user': uid2user(uid),
  402. b'gid': gid, b'group': gid2group(gid),
  403. b'mtime': int_to_bigint(int(time.time()) * 1000000000)
  404. }
  405. self.add_item(item)
  406. def process_file(self, path, st, cache):
  407. status = None
  408. safe_path = make_path_safe(path)
  409. # Is it a hard link?
  410. if st.st_nlink > 1:
  411. source = self.hard_links.get((st.st_ino, st.st_dev))
  412. if (st.st_ino, st.st_dev) in self.hard_links:
  413. item = self.stat_attrs(st, path)
  414. item.update({b'path': safe_path, b'source': source})
  415. self.add_item(item)
  416. status = 'h' # regular file, hardlink (to already seen inodes)
  417. return status
  418. else:
  419. self.hard_links[st.st_ino, st.st_dev] = safe_path
  420. path_hash = self.key.id_hash(os.path.join(self.cwd, path).encode('utf-8', 'surrogateescape'))
  421. ids = cache.file_known_and_unchanged(path_hash, st)
  422. chunks = None
  423. if ids is not None:
  424. # Make sure all ids are available
  425. for id_ in ids:
  426. if not cache.seen_chunk(id_):
  427. break
  428. else:
  429. chunks = [cache.chunk_incref(id_, self.stats) for id_ in ids]
  430. status = 'U' # regular file, unchanged
  431. else:
  432. status = 'A' # regular file, added
  433. # Only chunkify the file if needed
  434. if chunks is None:
  435. fh = Archive._open_rb(path, st)
  436. with os.fdopen(fh, 'rb') as fd:
  437. chunks = []
  438. for chunk in self.chunker.chunkify(fd, fh):
  439. chunks.append(cache.add_chunk(self.key.id_hash(chunk), chunk, self.stats))
  440. cache.memorize_file(path_hash, st, [c[0] for c in chunks])
  441. status = status or 'M' # regular file, modified (if not 'A' already)
  442. item = {b'path': safe_path, b'chunks': chunks}
  443. item.update(self.stat_attrs(st, path))
  444. self.stats.nfiles += 1
  445. self.add_item(item)
  446. return status
  447. @staticmethod
  448. def list_archives(repository, key, manifest, cache=None):
  449. for name, info in manifest.archives.items():
  450. yield Archive(repository, key, manifest, name, cache=cache)
  451. @staticmethod
  452. def _open_rb(path, st):
  453. flags_normal = os.O_RDONLY | getattr(os, 'O_BINARY', 0)
  454. flags_noatime = flags_normal | getattr(os, 'NO_ATIME', 0)
  455. euid = None
  456. def open_simple(p, s):
  457. return os.open(p, flags_normal)
  458. def open_noatime(p, s):
  459. return os.open(p, flags_noatime)
  460. def open_noatime_if_owner(p, s):
  461. if euid == 0 or s.st_uid == euid:
  462. # we are root or owner of file
  463. return open_noatime(p, s)
  464. else:
  465. return open_simple(p, s)
  466. def open_noatime_with_fallback(p, s):
  467. try:
  468. fd = os.open(p, flags_noatime)
  469. except PermissionError:
  470. # Was this EPERM due to the O_NOATIME flag?
  471. fd = os.open(p, flags_normal)
  472. # Yes, it was -- otherwise the above line would have thrown
  473. # another exception.
  474. nonlocal euid
  475. euid = os.geteuid()
  476. # So in future, let's check whether the file is owned by us
  477. # before attempting to use O_NOATIME.
  478. Archive._open_rb = open_noatime_if_owner
  479. return fd
  480. if flags_noatime != flags_normal:
  481. # Always use O_NOATIME version.
  482. Archive._open_rb = open_noatime_with_fallback
  483. else:
  484. # Always use non-O_NOATIME version.
  485. Archive._open_rb = open_simple
  486. return Archive._open_rb(path, st)
  487. class RobustUnpacker():
  488. """A restartable/robust version of the streaming msgpack unpacker
  489. """
  490. item_keys = [msgpack.packb(name) for name in ('path', 'mode', 'source', 'chunks', 'rdev', 'xattrs', 'user', 'group', 'uid', 'gid', 'mtime')]
  491. def __init__(self, validator):
  492. super(RobustUnpacker, self).__init__()
  493. self.validator = validator
  494. self._buffered_data = []
  495. self._resync = False
  496. self._unpacker = msgpack.Unpacker(object_hook=StableDict)
  497. def resync(self):
  498. self._buffered_data = []
  499. self._resync = True
  500. def feed(self, data):
  501. if self._resync:
  502. self._buffered_data.append(data)
  503. else:
  504. self._unpacker.feed(data)
  505. def __iter__(self):
  506. return self
  507. def __next__(self):
  508. if self._resync:
  509. data = b''.join(self._buffered_data)
  510. while self._resync:
  511. if not data:
  512. raise StopIteration
  513. # Abort early if the data does not look like a serialized dict
  514. if len(data) < 2 or ((data[0] & 0xf0) != 0x80) or ((data[1] & 0xe0) != 0xa0):
  515. data = data[1:]
  516. continue
  517. # Make sure it looks like an item dict
  518. for pattern in self.item_keys:
  519. if data[1:].startswith(pattern):
  520. break
  521. else:
  522. data = data[1:]
  523. continue
  524. self._unpacker = msgpack.Unpacker(object_hook=StableDict)
  525. self._unpacker.feed(data)
  526. try:
  527. item = next(self._unpacker)
  528. if self.validator(item):
  529. self._resync = False
  530. return item
  531. # Ignore exceptions that might be raised when feeding
  532. # msgpack with invalid data
  533. except (TypeError, ValueError, StopIteration):
  534. pass
  535. data = data[1:]
  536. else:
  537. return next(self._unpacker)
  538. class ArchiveChecker:
  539. def __init__(self):
  540. self.error_found = False
  541. self.possibly_superseded = set()
  542. self.tmpdir = tempfile.mkdtemp()
  543. def __del__(self):
  544. shutil.rmtree(self.tmpdir)
  545. def check(self, repository, repair=False, last=None):
  546. self.report_progress('Starting archive consistency check...')
  547. self.repair = repair
  548. self.repository = repository
  549. self.init_chunks()
  550. self.key = self.identify_key(repository)
  551. if Manifest.manifest_id(repository) not in self.chunks:
  552. self.manifest = self.rebuild_manifest()
  553. else:
  554. self.manifest, _ = Manifest.load(repository, key=self.key)
  555. self.rebuild_refcounts(last=last)
  556. if last is None:
  557. self.verify_chunks()
  558. else:
  559. self.report_progress('Orphaned objects check skipped (needs all archives checked)')
  560. if not self.error_found:
  561. self.report_progress('Archive consistency check complete, no problems found.')
  562. return self.repair or not self.error_found
  563. def init_chunks(self):
  564. """Fetch a list of all object keys from repository
  565. """
  566. # Explicity set the initial hash table capacity to avoid performance issues
  567. # due to hash table "resonance"
  568. capacity = int(len(self.repository) * 1.2)
  569. self.chunks = ChunkIndex(capacity, key_size=self.repository.key_size)
  570. marker = None
  571. while True:
  572. result = self.repository.list(limit=10000, marker=marker)
  573. if not result:
  574. break
  575. marker = result[-1]
  576. for id_ in result:
  577. self.chunks[id_] = (0, 0, 0)
  578. def report_progress(self, msg, error=False):
  579. if error:
  580. self.error_found = True
  581. print(msg, file=sys.stderr if error else sys.stdout)
  582. def identify_key(self, repository):
  583. cdata = repository.get(next(self.chunks.iteritems())[0])
  584. return key_factory(repository, cdata)
  585. def rebuild_manifest(self):
  586. """Rebuild the manifest object if it is missing
  587. Iterates through all objects in the repository looking for archive metadata blocks.
  588. """
  589. self.report_progress('Rebuilding missing manifest, this might take some time...', error=True)
  590. manifest = Manifest(self.key, self.repository)
  591. for chunk_id, _ in self.chunks.iteritems():
  592. cdata = self.repository.get(chunk_id)
  593. data = self.key.decrypt(chunk_id, cdata)
  594. # Some basic sanity checks of the payload before feeding it into msgpack
  595. if len(data) < 2 or ((data[0] & 0xf0) != 0x80) or ((data[1] & 0xe0) != 0xa0):
  596. continue
  597. if b'cmdline' not in data or b'\xa7version\x01' not in data:
  598. continue
  599. try:
  600. archive = msgpack.unpackb(data)
  601. # Ignore exceptions that might be raised when feeding
  602. # msgpack with invalid data
  603. except (TypeError, ValueError, StopIteration):
  604. continue
  605. if isinstance(archive, dict) and b'items' in archive and b'cmdline' in archive:
  606. self.report_progress('Found archive ' + archive[b'name'].decode('utf-8'), error=True)
  607. manifest.archives[archive[b'name'].decode('utf-8')] = {b'id': chunk_id, b'time': archive[b'time']}
  608. self.report_progress('Manifest rebuild complete', error=True)
  609. return manifest
  610. def rebuild_refcounts(self, last=None):
  611. """Rebuild object reference counts by walking the metadata
  612. Missing and/or incorrect data is repaired when detected
  613. """
  614. # Exclude the manifest from chunks
  615. del self.chunks[Manifest.manifest_id(self.repository)]
  616. def mark_as_possibly_superseded(id_):
  617. if self.chunks.get(id_, (0,))[0] == 0:
  618. self.possibly_superseded.add(id_)
  619. def add_callback(chunk):
  620. id_ = self.key.id_hash(chunk)
  621. cdata = self.key.encrypt(chunk)
  622. add_reference(id_, len(chunk), len(cdata), cdata)
  623. return id_
  624. def add_reference(id_, size, csize, cdata=None):
  625. try:
  626. count, _, _ = self.chunks[id_]
  627. self.chunks[id_] = count + 1, size, csize
  628. except KeyError:
  629. assert cdata is not None
  630. self.chunks[id_] = 1, size, csize
  631. if self.repair:
  632. self.repository.put(id_, cdata)
  633. def verify_file_chunks(item):
  634. """Verifies that all file chunks are present
  635. Missing file chunks will be replaced with new chunks of the same
  636. length containing all zeros.
  637. """
  638. offset = 0
  639. chunk_list = []
  640. for chunk_id, size, csize in item[b'chunks']:
  641. if chunk_id not in self.chunks:
  642. # If a file chunk is missing, create an all empty replacement chunk
  643. self.report_progress('{}: Missing file chunk detected (Byte {}-{})'.format(item[b'path'].decode('utf-8', 'surrogateescape'), offset, offset + size), error=True)
  644. data = bytes(size)
  645. chunk_id = self.key.id_hash(data)
  646. cdata = self.key.encrypt(data)
  647. csize = len(cdata)
  648. add_reference(chunk_id, size, csize, cdata)
  649. else:
  650. add_reference(chunk_id, size, csize)
  651. chunk_list.append((chunk_id, size, csize))
  652. offset += size
  653. item[b'chunks'] = chunk_list
  654. def robust_iterator(archive):
  655. """Iterates through all archive items
  656. Missing item chunks will be skipped and the msgpack stream will be restarted
  657. """
  658. unpacker = RobustUnpacker(lambda item: isinstance(item, dict) and b'path' in item)
  659. _state = 0
  660. def missing_chunk_detector(chunk_id):
  661. nonlocal _state
  662. if _state % 2 != int(chunk_id not in self.chunks):
  663. _state += 1
  664. return _state
  665. for state, items in groupby(archive[b'items'], missing_chunk_detector):
  666. items = list(items)
  667. if state % 2:
  668. self.report_progress('Archive metadata damage detected', error=True)
  669. continue
  670. if state > 0:
  671. unpacker.resync()
  672. for chunk_id, cdata in zip(items, repository.get_many(items)):
  673. unpacker.feed(self.key.decrypt(chunk_id, cdata))
  674. for item in unpacker:
  675. yield item
  676. repository = cache_if_remote(self.repository)
  677. num_archives = len(self.manifest.archives)
  678. archive_items = sorted(self.manifest.archives.items(), reverse=True,
  679. key=lambda name_info: name_info[1][b'time'])
  680. end = None if last is None else min(num_archives, last)
  681. for i, (name, info) in enumerate(archive_items[:end]):
  682. self.report_progress('Analyzing archive {} ({}/{})'.format(name, num_archives - i, num_archives))
  683. archive_id = info[b'id']
  684. if archive_id not in self.chunks:
  685. self.report_progress('Archive metadata block is missing', error=True)
  686. del self.manifest.archives[name]
  687. continue
  688. mark_as_possibly_superseded(archive_id)
  689. cdata = self.repository.get(archive_id)
  690. data = self.key.decrypt(archive_id, cdata)
  691. archive = StableDict(msgpack.unpackb(data))
  692. if archive[b'version'] != 1:
  693. raise Exception('Unknown archive metadata version')
  694. decode_dict(archive, (b'name', b'hostname', b'username', b'time')) # fixme: argv
  695. items_buffer = ChunkBuffer(self.key)
  696. items_buffer.write_chunk = add_callback
  697. for item in robust_iterator(archive):
  698. if b'chunks' in item:
  699. verify_file_chunks(item)
  700. items_buffer.add(item)
  701. items_buffer.flush(flush=True)
  702. for previous_item_id in archive[b'items']:
  703. mark_as_possibly_superseded(previous_item_id)
  704. archive[b'items'] = items_buffer.chunks
  705. data = msgpack.packb(archive, unicode_errors='surrogateescape')
  706. new_archive_id = self.key.id_hash(data)
  707. cdata = self.key.encrypt(data)
  708. add_reference(new_archive_id, len(data), len(cdata), cdata)
  709. info[b'id'] = new_archive_id
  710. def verify_chunks(self):
  711. unused = set()
  712. for id_, (count, size, csize) in self.chunks.iteritems():
  713. if count == 0:
  714. unused.add(id_)
  715. orphaned = unused - self.possibly_superseded
  716. if orphaned:
  717. self.report_progress('{} orphaned objects found'.format(len(orphaned)), error=True)
  718. if self.repair:
  719. for id_ in unused:
  720. self.repository.delete(id_)
  721. self.manifest.write()
  722. self.repository.commit()