archive.py 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808
  1. from datetime import datetime
  2. from getpass import getuser
  3. from itertools import groupby
  4. import errno
  5. import shutil
  6. import tempfile
  7. from attic.key import key_factory
  8. from attic.remote import cache_if_remote
  9. import msgpack
  10. import os
  11. import socket
  12. import stat
  13. import sys
  14. import time
  15. from io import BytesIO
  16. from attic import xattr
  17. from attic.platform import acl_get, acl_set
  18. from attic.chunker import Chunker
  19. from attic.hashindex import ChunkIndex
  20. from attic.helpers import parse_timestamp, Error, uid2user, user2uid, gid2group, group2gid, \
  21. Manifest, Statistics, decode_dict, st_mtime_ns, make_path_safe, StableDict, int_to_bigint, bigint_to_int
  22. ITEMS_BUFFER = 1024 * 1024
  23. CHUNK_MIN = 1024
  24. CHUNK_MAX = 10 * 1024 * 1024
  25. WINDOW_SIZE = 0xfff
  26. CHUNK_MASK = 0xffff
  27. ZEROS = b'\0' * CHUNK_MAX
  28. utime_supports_fd = os.utime in getattr(os, 'supports_fd', {})
  29. utime_supports_follow_symlinks = os.utime in getattr(os, 'supports_follow_symlinks', {})
  30. has_mtime_ns = sys.version >= '3.3'
  31. has_lchmod = hasattr(os, 'lchmod')
  32. has_lchflags = hasattr(os, 'lchflags')
  33. # Python <= 3.2 raises OSError instead of PermissionError (See #164)
  34. try:
  35. PermissionError = PermissionError
  36. except NameError:
  37. PermissionError = OSError
  38. class DownloadPipeline:
  39. def __init__(self, repository, key):
  40. self.repository = repository
  41. self.key = key
  42. def unpack_many(self, ids, filter=None, preload=False):
  43. unpacker = msgpack.Unpacker(use_list=False)
  44. for data in self.fetch_many(ids):
  45. unpacker.feed(data)
  46. items = [decode_dict(item, (b'path', b'source', b'user', b'group')) for item in unpacker]
  47. if filter:
  48. items = [item for item in items if filter(item)]
  49. if preload:
  50. for item in items:
  51. if b'chunks' in item:
  52. self.repository.preload([c[0] for c in item[b'chunks']])
  53. for item in items:
  54. yield item
  55. def fetch_many(self, ids, is_preloaded=False):
  56. for id_, data in zip(ids, self.repository.get_many(ids, is_preloaded=is_preloaded)):
  57. yield self.key.decrypt(id_, data)
  58. class ChunkBuffer:
  59. BUFFER_SIZE = 1 * 1024 * 1024
  60. def __init__(self, key):
  61. self.buffer = BytesIO()
  62. self.packer = msgpack.Packer(unicode_errors='surrogateescape')
  63. self.chunks = []
  64. self.key = key
  65. self.chunker = Chunker(WINDOW_SIZE, CHUNK_MASK, CHUNK_MIN, CHUNK_MAX,self.key.chunk_seed)
  66. def add(self, item):
  67. self.buffer.write(self.packer.pack(StableDict(item)))
  68. if self.is_full():
  69. self.flush()
  70. def write_chunk(self, chunk):
  71. raise NotImplementedError
  72. def flush(self, flush=False):
  73. if self.buffer.tell() == 0:
  74. return
  75. self.buffer.seek(0)
  76. chunks = list(bytes(s) for s in self.chunker.chunkify(self.buffer))
  77. self.buffer.seek(0)
  78. self.buffer.truncate(0)
  79. # Leave the last partial chunk in the buffer unless flush is True
  80. end = None if flush or len(chunks) == 1 else -1
  81. for chunk in chunks[:end]:
  82. self.chunks.append(self.write_chunk(chunk))
  83. if end == -1:
  84. self.buffer.write(chunks[-1])
  85. def is_full(self):
  86. return self.buffer.tell() > self.BUFFER_SIZE
  87. class CacheChunkBuffer(ChunkBuffer):
  88. def __init__(self, cache, key, stats):
  89. super(CacheChunkBuffer, self).__init__(key)
  90. self.cache = cache
  91. self.stats = stats
  92. def write_chunk(self, chunk):
  93. id_, _, _ = self.cache.add_chunk(self.key.id_hash(chunk), chunk, self.stats)
  94. return id_
  95. class Archive:
  96. class DoesNotExist(Error):
  97. """Archive {} does not exist"""
  98. class AlreadyExists(Error):
  99. """Archive {} already exists"""
  100. class IncompatibleFilesystemEncodingError(Error):
  101. """Failed to encode filename "{}" into file system encoding "{}". Consider configuring the LANG environment variable."""
  102. def __init__(self, repository, key, manifest, name, cache=None, create=False,
  103. checkpoint_interval=300, numeric_owner=False, progress=False):
  104. self.cwd = os.getcwd()
  105. self.key = key
  106. self.repository = repository
  107. self.cache = cache
  108. self.manifest = manifest
  109. self.hard_links = {}
  110. self.stats = Statistics()
  111. self.show_progress = progress
  112. self.last_progress = time.time()
  113. self.name = name
  114. self.checkpoint_interval = checkpoint_interval
  115. self.numeric_owner = numeric_owner
  116. self.pipeline = DownloadPipeline(self.repository, self.key)
  117. if create:
  118. self.items_buffer = CacheChunkBuffer(self.cache, self.key, self.stats)
  119. self.chunker = Chunker(WINDOW_SIZE, CHUNK_MASK, CHUNK_MIN, CHUNK_MAX, self.key.chunk_seed)
  120. if name in manifest.archives:
  121. raise self.AlreadyExists(name)
  122. self.last_checkpoint = time.time()
  123. i = 0
  124. while True:
  125. self.checkpoint_name = '%s.checkpoint%s' % (name, i and ('.%d' % i) or '')
  126. if self.checkpoint_name not in manifest.archives:
  127. break
  128. i += 1
  129. else:
  130. if name not in self.manifest.archives:
  131. raise self.DoesNotExist(name)
  132. info = self.manifest.archives[name]
  133. self.load(info[b'id'])
  134. def _load_meta(self, id):
  135. data = self.key.decrypt(id, self.repository.get(id))
  136. metadata = msgpack.unpackb(data)
  137. if metadata[b'version'] != 1:
  138. raise Exception('Unknown archive metadata version')
  139. return metadata
  140. def load(self, id):
  141. self.id = id
  142. self.metadata = self._load_meta(self.id)
  143. decode_dict(self.metadata, (b'name', b'hostname', b'username', b'time'))
  144. self.metadata[b'cmdline'] = [arg.decode('utf-8', 'surrogateescape') for arg in self.metadata[b'cmdline']]
  145. self.name = self.metadata[b'name']
  146. @property
  147. def ts(self):
  148. """Timestamp of archive creation in UTC"""
  149. return parse_timestamp(self.metadata[b'time'])
  150. def __repr__(self):
  151. return 'Archive(%r)' % self.name
  152. def iter_items(self, filter=None, preload=False):
  153. for item in self.pipeline.unpack_many(self.metadata[b'items'], filter=filter, preload=preload):
  154. yield item
  155. def add_item(self, item):
  156. if self.show_progress and time.time() - self.last_progress > 0.2:
  157. self.stats.show_progress(item=item)
  158. self.last_progress = time.time()
  159. self.items_buffer.add(item)
  160. if time.time() - self.last_checkpoint > self.checkpoint_interval:
  161. self.write_checkpoint()
  162. self.last_checkpoint = time.time()
  163. def write_checkpoint(self):
  164. self.save(self.checkpoint_name)
  165. del self.manifest.archives[self.checkpoint_name]
  166. self.cache.chunk_decref(self.id, self.stats)
  167. def save(self, name=None, timestamp=None):
  168. name = name or self.name
  169. if name in self.manifest.archives:
  170. raise self.AlreadyExists(name)
  171. self.items_buffer.flush(flush=True)
  172. if timestamp is None:
  173. timestamp = datetime.utcnow()
  174. metadata = StableDict({
  175. 'version': 1,
  176. 'name': name,
  177. 'items': self.items_buffer.chunks,
  178. 'cmdline': sys.argv,
  179. 'hostname': socket.gethostname(),
  180. 'username': getuser(),
  181. 'time': timestamp.isoformat(),
  182. })
  183. data = msgpack.packb(metadata, unicode_errors='surrogateescape')
  184. self.id = self.key.id_hash(data)
  185. self.cache.add_chunk(self.id, data, self.stats)
  186. self.manifest.archives[name] = {'id': self.id, 'time': metadata['time']}
  187. self.manifest.write()
  188. self.repository.commit()
  189. self.cache.commit()
  190. def calc_stats(self, cache):
  191. def add(id):
  192. count, size, csize = cache.chunks[id]
  193. stats.update(size, csize, count == 1)
  194. cache.chunks[id] = count - 1, size, csize
  195. def add_file_chunks(chunks):
  196. for id, _, _ in chunks:
  197. add(id)
  198. # This function is a bit evil since it abuses the cache to calculate
  199. # the stats. The cache transaction must be rolled back afterwards
  200. unpacker = msgpack.Unpacker(use_list=False)
  201. cache.begin_txn()
  202. stats = Statistics()
  203. add(self.id)
  204. for id, chunk in zip(self.metadata[b'items'], self.repository.get_many(self.metadata[b'items'])):
  205. add(id)
  206. unpacker.feed(self.key.decrypt(id, chunk))
  207. for item in unpacker:
  208. if b'chunks' in item:
  209. stats.nfiles += 1
  210. add_file_chunks(item[b'chunks'])
  211. cache.rollback()
  212. return stats
  213. def extract_item(self, item, restore_attrs=True, dry_run=False, stdout=False, sparse=False):
  214. if dry_run or stdout:
  215. if b'chunks' in item:
  216. for data in self.pipeline.fetch_many([c[0] for c in item[b'chunks']], is_preloaded=True):
  217. if stdout:
  218. sys.stdout.buffer.write(data)
  219. if stdout:
  220. sys.stdout.buffer.flush()
  221. return
  222. dest = self.cwd
  223. if item[b'path'].startswith('/') or item[b'path'].startswith('..'):
  224. raise Exception('Path should be relative and local')
  225. path = os.path.join(dest, item[b'path'])
  226. # Attempt to remove existing files, ignore errors on failure
  227. try:
  228. st = os.lstat(path)
  229. if stat.S_ISDIR(st.st_mode):
  230. os.rmdir(path)
  231. else:
  232. os.unlink(path)
  233. except UnicodeEncodeError:
  234. raise self.IncompatibleFilesystemEncodingError(path, sys.getfilesystemencoding())
  235. except OSError:
  236. pass
  237. mode = item[b'mode']
  238. if stat.S_ISDIR(mode):
  239. if not os.path.exists(path):
  240. os.makedirs(path)
  241. if restore_attrs:
  242. self.restore_attrs(path, item)
  243. elif stat.S_ISREG(mode):
  244. if not os.path.exists(os.path.dirname(path)):
  245. os.makedirs(os.path.dirname(path))
  246. # Hard link?
  247. if b'source' in item:
  248. source = os.path.join(dest, item[b'source'])
  249. if os.path.exists(path):
  250. os.unlink(path)
  251. os.link(source, path)
  252. else:
  253. with open(path, 'wb') as fd:
  254. ids = [c[0] for c in item[b'chunks']]
  255. for data in self.pipeline.fetch_many(ids, is_preloaded=True):
  256. if sparse and ZEROS.startswith(data):
  257. # all-zero chunk: create a hole in a sparse file
  258. fd.seek(len(data), 1)
  259. else:
  260. fd.write(data)
  261. pos = fd.tell()
  262. fd.truncate(pos)
  263. fd.flush()
  264. self.restore_attrs(path, item, fd=fd.fileno())
  265. elif stat.S_ISFIFO(mode):
  266. if not os.path.exists(os.path.dirname(path)):
  267. os.makedirs(os.path.dirname(path))
  268. os.mkfifo(path)
  269. self.restore_attrs(path, item)
  270. elif stat.S_ISLNK(mode):
  271. if not os.path.exists(os.path.dirname(path)):
  272. os.makedirs(os.path.dirname(path))
  273. source = item[b'source']
  274. if os.path.exists(path):
  275. os.unlink(path)
  276. os.symlink(source, path)
  277. self.restore_attrs(path, item, symlink=True)
  278. elif stat.S_ISCHR(mode) or stat.S_ISBLK(mode):
  279. os.mknod(path, item[b'mode'], item[b'rdev'])
  280. self.restore_attrs(path, item)
  281. else:
  282. raise Exception('Unknown archive item type %r' % item[b'mode'])
  283. def restore_attrs(self, path, item, symlink=False, fd=None):
  284. xattrs = item.get(b'xattrs')
  285. if xattrs:
  286. for k, v in xattrs.items():
  287. try:
  288. xattr.setxattr(fd or path, k, v, follow_symlinks=False)
  289. except OSError as e:
  290. if e.errno != errno.ENOTSUP:
  291. raise
  292. uid = gid = None
  293. if not self.numeric_owner:
  294. uid = user2uid(item[b'user'])
  295. gid = group2gid(item[b'group'])
  296. uid = item[b'uid'] if uid is None else uid
  297. gid = item[b'gid'] if gid is None else gid
  298. # This code is a bit of a mess due to os specific differences
  299. try:
  300. if fd:
  301. os.fchown(fd, uid, gid)
  302. else:
  303. os.lchown(path, uid, gid)
  304. except OSError:
  305. pass
  306. if fd:
  307. os.fchmod(fd, item[b'mode'])
  308. elif not symlink:
  309. os.chmod(path, item[b'mode'])
  310. elif has_lchmod: # Not available on Linux
  311. os.lchmod(path, item[b'mode'])
  312. mtime = bigint_to_int(item[b'mtime'])
  313. if fd and utime_supports_fd: # Python >= 3.3
  314. os.utime(fd, None, ns=(mtime, mtime))
  315. elif utime_supports_follow_symlinks: # Python >= 3.3
  316. os.utime(path, None, ns=(mtime, mtime), follow_symlinks=False)
  317. elif not symlink:
  318. os.utime(path, (mtime / 1e9, mtime / 1e9))
  319. acl_set(path, item, self.numeric_owner)
  320. # Only available on OS X and FreeBSD
  321. if has_lchflags and b'bsdflags' in item:
  322. try:
  323. os.lchflags(path, item[b'bsdflags'])
  324. except OSError:
  325. pass
  326. def rename(self, name):
  327. if name in self.manifest.archives:
  328. raise self.AlreadyExists(name)
  329. metadata = StableDict(self._load_meta(self.id))
  330. metadata[b'name'] = name
  331. data = msgpack.packb(metadata, unicode_errors='surrogateescape')
  332. new_id = self.key.id_hash(data)
  333. self.cache.add_chunk(new_id, data, self.stats)
  334. self.manifest.archives[name] = {'id': new_id, 'time': metadata[b'time']}
  335. self.cache.chunk_decref(self.id, self.stats)
  336. del self.manifest.archives[self.name]
  337. def delete(self, stats):
  338. unpacker = msgpack.Unpacker(use_list=False)
  339. for items_id, data in zip(self.metadata[b'items'], self.repository.get_many(self.metadata[b'items'])):
  340. unpacker.feed(self.key.decrypt(items_id, data))
  341. self.cache.chunk_decref(items_id, stats)
  342. for item in unpacker:
  343. if b'chunks' in item:
  344. for chunk_id, size, csize in item[b'chunks']:
  345. self.cache.chunk_decref(chunk_id, stats)
  346. self.cache.chunk_decref(self.id, stats)
  347. del self.manifest.archives[self.name]
  348. def stat_attrs(self, st, path):
  349. item = {
  350. b'mode': st.st_mode,
  351. b'uid': st.st_uid, b'user': uid2user(st.st_uid),
  352. b'gid': st.st_gid, b'group': gid2group(st.st_gid),
  353. b'mtime': int_to_bigint(st_mtime_ns(st))
  354. }
  355. if self.numeric_owner:
  356. item[b'user'] = item[b'group'] = None
  357. xattrs = xattr.get_all(path, follow_symlinks=False)
  358. if xattrs:
  359. item[b'xattrs'] = StableDict(xattrs)
  360. if has_lchflags and st.st_flags:
  361. item[b'bsdflags'] = st.st_flags
  362. acl_get(path, item, st, self.numeric_owner)
  363. return item
  364. def process_dir(self, path, st):
  365. item = {b'path': make_path_safe(path)}
  366. item.update(self.stat_attrs(st, path))
  367. self.add_item(item)
  368. return 'd' # directory
  369. def process_fifo(self, path, st):
  370. item = {b'path': make_path_safe(path)}
  371. item.update(self.stat_attrs(st, path))
  372. self.add_item(item)
  373. return 'f' # fifo
  374. def process_dev(self, path, st):
  375. item = {b'path': make_path_safe(path), b'rdev': st.st_rdev}
  376. item.update(self.stat_attrs(st, path))
  377. self.add_item(item)
  378. if stat.S_ISCHR(st.st_mode):
  379. return 'c' # char device
  380. elif stat.S_ISBLK(st.st_mode):
  381. return 'b' # block device
  382. def process_symlink(self, path, st):
  383. source = os.readlink(path)
  384. item = {b'path': make_path_safe(path), b'source': source}
  385. item.update(self.stat_attrs(st, path))
  386. self.add_item(item)
  387. return 's' # symlink
  388. def process_stdin(self, path, cache):
  389. uid, gid = 0, 0
  390. fd = sys.stdin.buffer # binary
  391. chunks = []
  392. for chunk in self.chunker.chunkify(fd):
  393. chunks.append(cache.add_chunk(self.key.id_hash(chunk), chunk, self.stats))
  394. self.stats.nfiles += 1
  395. item = {
  396. b'path': path,
  397. b'chunks': chunks,
  398. b'mode': 0o100660, # regular file, ug=rw
  399. b'uid': uid, b'user': uid2user(uid),
  400. b'gid': gid, b'group': gid2group(gid),
  401. b'mtime': int_to_bigint(int(time.time()) * 1000000000)
  402. }
  403. self.add_item(item)
  404. def process_file(self, path, st, cache):
  405. status = None
  406. safe_path = make_path_safe(path)
  407. # Is it a hard link?
  408. if st.st_nlink > 1:
  409. source = self.hard_links.get((st.st_ino, st.st_dev))
  410. if (st.st_ino, st.st_dev) in self.hard_links:
  411. item = self.stat_attrs(st, path)
  412. item.update({b'path': safe_path, b'source': source})
  413. self.add_item(item)
  414. status = 'h' # regular file, hardlink (to already seen inodes)
  415. return status
  416. else:
  417. self.hard_links[st.st_ino, st.st_dev] = safe_path
  418. path_hash = self.key.id_hash(os.path.join(self.cwd, path).encode('utf-8', 'surrogateescape'))
  419. ids = cache.file_known_and_unchanged(path_hash, st)
  420. chunks = None
  421. if ids is not None:
  422. # Make sure all ids are available
  423. for id_ in ids:
  424. if not cache.seen_chunk(id_):
  425. break
  426. else:
  427. chunks = [cache.chunk_incref(id_, self.stats) for id_ in ids]
  428. status = 'U' # regular file, unchanged
  429. else:
  430. status = 'A' # regular file, added
  431. # Only chunkify the file if needed
  432. if chunks is None:
  433. fh = Archive._open_rb(path, st)
  434. with os.fdopen(fh, 'rb') as fd:
  435. chunks = []
  436. for chunk in self.chunker.chunkify(fd, fh):
  437. chunks.append(cache.add_chunk(self.key.id_hash(chunk), chunk, self.stats))
  438. cache.memorize_file(path_hash, st, [c[0] for c in chunks])
  439. status = status or 'M' # regular file, modified (if not 'A' already)
  440. item = {b'path': safe_path, b'chunks': chunks}
  441. item.update(self.stat_attrs(st, path))
  442. self.stats.nfiles += 1
  443. self.add_item(item)
  444. return status
  445. @staticmethod
  446. def list_archives(repository, key, manifest, cache=None):
  447. for name, info in manifest.archives.items():
  448. yield Archive(repository, key, manifest, name, cache=cache)
  449. @staticmethod
  450. def _open_rb(path, st):
  451. flags_normal = os.O_RDONLY | getattr(os, 'O_BINARY', 0)
  452. flags_noatime = flags_normal | getattr(os, 'NO_ATIME', 0)
  453. euid = None
  454. def open_simple(p, s):
  455. return os.open(p, flags_normal)
  456. def open_noatime(p, s):
  457. return os.open(p, flags_noatime)
  458. def open_noatime_if_owner(p, s):
  459. if euid == 0 or s.st_uid == euid:
  460. # we are root or owner of file
  461. return open_noatime(p, s)
  462. else:
  463. return open_simple(p, s)
  464. def open_noatime_with_fallback(p, s):
  465. try:
  466. fd = os.open(p, flags_noatime)
  467. except PermissionError:
  468. # Was this EPERM due to the O_NOATIME flag?
  469. fd = os.open(p, flags_normal)
  470. # Yes, it was -- otherwise the above line would have thrown
  471. # another exception.
  472. nonlocal euid
  473. euid = os.geteuid()
  474. # So in future, let's check whether the file is owned by us
  475. # before attempting to use O_NOATIME.
  476. Archive._open_rb = open_noatime_if_owner
  477. return fd
  478. if flags_noatime != flags_normal:
  479. # Always use O_NOATIME version.
  480. Archive._open_rb = open_noatime_with_fallback
  481. else:
  482. # Always use non-O_NOATIME version.
  483. Archive._open_rb = open_simple
  484. return Archive._open_rb(path, st)
  485. class RobustUnpacker():
  486. """A restartable/robust version of the streaming msgpack unpacker
  487. """
  488. item_keys = [msgpack.packb(name) for name in ('path', 'mode', 'source', 'chunks', 'rdev', 'xattrs', 'user', 'group', 'uid', 'gid', 'mtime')]
  489. def __init__(self, validator):
  490. super(RobustUnpacker, self).__init__()
  491. self.validator = validator
  492. self._buffered_data = []
  493. self._resync = False
  494. self._unpacker = msgpack.Unpacker(object_hook=StableDict)
  495. def resync(self):
  496. self._buffered_data = []
  497. self._resync = True
  498. def feed(self, data):
  499. if self._resync:
  500. self._buffered_data.append(data)
  501. else:
  502. self._unpacker.feed(data)
  503. def __iter__(self):
  504. return self
  505. def __next__(self):
  506. if self._resync:
  507. data = b''.join(self._buffered_data)
  508. while self._resync:
  509. if not data:
  510. raise StopIteration
  511. # Abort early if the data does not look like a serialized dict
  512. if len(data) < 2 or ((data[0] & 0xf0) != 0x80) or ((data[1] & 0xe0) != 0xa0):
  513. data = data[1:]
  514. continue
  515. # Make sure it looks like an item dict
  516. for pattern in self.item_keys:
  517. if data[1:].startswith(pattern):
  518. break
  519. else:
  520. data = data[1:]
  521. continue
  522. self._unpacker = msgpack.Unpacker(object_hook=StableDict)
  523. self._unpacker.feed(data)
  524. try:
  525. item = next(self._unpacker)
  526. if self.validator(item):
  527. self._resync = False
  528. return item
  529. # Ignore exceptions that might be raised when feeding
  530. # msgpack with invalid data
  531. except (TypeError, ValueError, StopIteration):
  532. pass
  533. data = data[1:]
  534. else:
  535. return next(self._unpacker)
  536. class ArchiveChecker:
  537. def __init__(self):
  538. self.error_found = False
  539. self.possibly_superseded = set()
  540. self.tmpdir = tempfile.mkdtemp()
  541. def __del__(self):
  542. shutil.rmtree(self.tmpdir)
  543. def check(self, repository, repair=False, last=None):
  544. self.report_progress('Starting archive consistency check...')
  545. self.repair = repair
  546. self.repository = repository
  547. self.init_chunks()
  548. self.key = self.identify_key(repository)
  549. if Manifest.MANIFEST_ID not in self.chunks:
  550. self.manifest = self.rebuild_manifest()
  551. else:
  552. self.manifest, _ = Manifest.load(repository, key=self.key)
  553. self.rebuild_refcounts(last=last)
  554. if last is None:
  555. self.verify_chunks()
  556. else:
  557. self.report_progress('Orphaned objects check skipped (needs all archives checked)')
  558. if not self.error_found:
  559. self.report_progress('Archive consistency check complete, no problems found.')
  560. return self.repair or not self.error_found
  561. def init_chunks(self):
  562. """Fetch a list of all object keys from repository
  563. """
  564. # Explicity set the initial hash table capacity to avoid performance issues
  565. # due to hash table "resonance"
  566. capacity = int(len(self.repository) * 1.2)
  567. self.chunks = ChunkIndex(capacity)
  568. marker = None
  569. while True:
  570. result = self.repository.list(limit=10000, marker=marker)
  571. if not result:
  572. break
  573. marker = result[-1]
  574. for id_ in result:
  575. self.chunks[id_] = (0, 0, 0)
  576. def report_progress(self, msg, error=False):
  577. if error:
  578. self.error_found = True
  579. print(msg, file=sys.stderr if error else sys.stdout)
  580. def identify_key(self, repository):
  581. cdata = repository.get(next(self.chunks.iteritems())[0])
  582. return key_factory(repository, cdata)
  583. def rebuild_manifest(self):
  584. """Rebuild the manifest object if it is missing
  585. Iterates through all objects in the repository looking for archive metadata blocks.
  586. """
  587. self.report_progress('Rebuilding missing manifest, this might take some time...', error=True)
  588. manifest = Manifest(self.key, self.repository)
  589. for chunk_id, _ in self.chunks.iteritems():
  590. cdata = self.repository.get(chunk_id)
  591. data = self.key.decrypt(chunk_id, cdata)
  592. # Some basic sanity checks of the payload before feeding it into msgpack
  593. if len(data) < 2 or ((data[0] & 0xf0) != 0x80) or ((data[1] & 0xe0) != 0xa0):
  594. continue
  595. if b'cmdline' not in data or b'\xa7version\x01' not in data:
  596. continue
  597. try:
  598. archive = msgpack.unpackb(data)
  599. # Ignore exceptions that might be raised when feeding
  600. # msgpack with invalid data
  601. except (TypeError, ValueError, StopIteration):
  602. continue
  603. if isinstance(archive, dict) and b'items' in archive and b'cmdline' in archive:
  604. self.report_progress('Found archive ' + archive[b'name'].decode('utf-8'), error=True)
  605. manifest.archives[archive[b'name'].decode('utf-8')] = {b'id': chunk_id, b'time': archive[b'time']}
  606. self.report_progress('Manifest rebuild complete', error=True)
  607. return manifest
  608. def rebuild_refcounts(self, last=None):
  609. """Rebuild object reference counts by walking the metadata
  610. Missing and/or incorrect data is repaired when detected
  611. """
  612. # Exclude the manifest from chunks
  613. del self.chunks[Manifest.MANIFEST_ID]
  614. def mark_as_possibly_superseded(id_):
  615. if self.chunks.get(id_, (0,))[0] == 0:
  616. self.possibly_superseded.add(id_)
  617. def add_callback(chunk):
  618. id_ = self.key.id_hash(chunk)
  619. cdata = self.key.encrypt(chunk)
  620. add_reference(id_, len(chunk), len(cdata), cdata)
  621. return id_
  622. def add_reference(id_, size, csize, cdata=None):
  623. try:
  624. count, _, _ = self.chunks[id_]
  625. self.chunks[id_] = count + 1, size, csize
  626. except KeyError:
  627. assert cdata is not None
  628. self.chunks[id_] = 1, size, csize
  629. if self.repair:
  630. self.repository.put(id_, cdata)
  631. def verify_file_chunks(item):
  632. """Verifies that all file chunks are present
  633. Missing file chunks will be replaced with new chunks of the same
  634. length containing all zeros.
  635. """
  636. offset = 0
  637. chunk_list = []
  638. for chunk_id, size, csize in item[b'chunks']:
  639. if chunk_id not in self.chunks:
  640. # If a file chunk is missing, create an all empty replacement chunk
  641. self.report_progress('{}: Missing file chunk detected (Byte {}-{})'.format(item[b'path'].decode('utf-8', 'surrogateescape'), offset, offset + size), error=True)
  642. data = bytes(size)
  643. chunk_id = self.key.id_hash(data)
  644. cdata = self.key.encrypt(data)
  645. csize = len(cdata)
  646. add_reference(chunk_id, size, csize, cdata)
  647. else:
  648. add_reference(chunk_id, size, csize)
  649. chunk_list.append((chunk_id, size, csize))
  650. offset += size
  651. item[b'chunks'] = chunk_list
  652. def robust_iterator(archive):
  653. """Iterates through all archive items
  654. Missing item chunks will be skipped and the msgpack stream will be restarted
  655. """
  656. unpacker = RobustUnpacker(lambda item: isinstance(item, dict) and b'path' in item)
  657. _state = 0
  658. def missing_chunk_detector(chunk_id):
  659. nonlocal _state
  660. if _state % 2 != int(chunk_id not in self.chunks):
  661. _state += 1
  662. return _state
  663. for state, items in groupby(archive[b'items'], missing_chunk_detector):
  664. items = list(items)
  665. if state % 2:
  666. self.report_progress('Archive metadata damage detected', error=True)
  667. continue
  668. if state > 0:
  669. unpacker.resync()
  670. for chunk_id, cdata in zip(items, repository.get_many(items)):
  671. unpacker.feed(self.key.decrypt(chunk_id, cdata))
  672. for item in unpacker:
  673. yield item
  674. repository = cache_if_remote(self.repository)
  675. num_archives = len(self.manifest.archives)
  676. archive_items = sorted(self.manifest.archives.items(), reverse=True,
  677. key=lambda name_info: name_info[1][b'time'])
  678. end = None if last is None else min(num_archives, last)
  679. for i, (name, info) in enumerate(archive_items[:end]):
  680. self.report_progress('Analyzing archive {} ({}/{})'.format(name, num_archives - i, num_archives))
  681. archive_id = info[b'id']
  682. if archive_id not in self.chunks:
  683. self.report_progress('Archive metadata block is missing', error=True)
  684. del self.manifest.archives[name]
  685. continue
  686. mark_as_possibly_superseded(archive_id)
  687. cdata = self.repository.get(archive_id)
  688. data = self.key.decrypt(archive_id, cdata)
  689. archive = StableDict(msgpack.unpackb(data))
  690. if archive[b'version'] != 1:
  691. raise Exception('Unknown archive metadata version')
  692. decode_dict(archive, (b'name', b'hostname', b'username', b'time')) # fixme: argv
  693. items_buffer = ChunkBuffer(self.key)
  694. items_buffer.write_chunk = add_callback
  695. for item in robust_iterator(archive):
  696. if b'chunks' in item:
  697. verify_file_chunks(item)
  698. items_buffer.add(item)
  699. items_buffer.flush(flush=True)
  700. for previous_item_id in archive[b'items']:
  701. mark_as_possibly_superseded(previous_item_id)
  702. archive[b'items'] = items_buffer.chunks
  703. data = msgpack.packb(archive, unicode_errors='surrogateescape')
  704. new_archive_id = self.key.id_hash(data)
  705. cdata = self.key.encrypt(data)
  706. add_reference(new_archive_id, len(data), len(cdata), cdata)
  707. info[b'id'] = new_archive_id
  708. def verify_chunks(self):
  709. unused = set()
  710. for id_, (count, size, csize) in self.chunks.iteritems():
  711. if count == 0:
  712. unused.add(id_)
  713. orphaned = unused - self.possibly_superseded
  714. if orphaned:
  715. self.report_progress('{} orphaned objects found'.format(len(orphaned)), error=True)
  716. if self.repair:
  717. for id_ in unused:
  718. self.repository.delete(id_)
  719. self.manifest.write()
  720. self.repository.commit()