archive.py 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826
  1. from datetime import datetime
  2. from getpass import getuser
  3. from itertools import groupby
  4. import errno
  5. from .key import key_factory
  6. from .remote import cache_if_remote
  7. import msgpack
  8. import os
  9. import socket
  10. import stat
  11. import sys
  12. import time
  13. from io import BytesIO
  14. from . import xattr
  15. from .platform import acl_get, acl_set
  16. from .chunker import Chunker
  17. from .hashindex import ChunkIndex
  18. from .helpers import parse_timestamp, Error, uid2user, user2uid, gid2group, group2gid, \
  19. Manifest, Statistics, decode_dict, st_mtime_ns, make_path_safe, StableDict, int_to_bigint, bigint_to_int
  20. ITEMS_BUFFER = 1024 * 1024
  21. CHUNK_MIN_EXP = 10 # 2**10 == 1kiB
  22. CHUNK_MAX_EXP = 23 # 2**23 == 8MiB
  23. HASH_WINDOW_SIZE = 0xfff # 4095B
  24. HASH_MASK_BITS = 16 # results in ~64kiB chunks statistically
  25. # defaults, use --chunker-params to override
  26. CHUNKER_PARAMS = (CHUNK_MIN_EXP, CHUNK_MAX_EXP, HASH_MASK_BITS, HASH_WINDOW_SIZE)
  27. utime_supports_fd = os.utime in getattr(os, 'supports_fd', {})
  28. utime_supports_follow_symlinks = os.utime in getattr(os, 'supports_follow_symlinks', {})
  29. has_mtime_ns = sys.version >= '3.3'
  30. has_lchmod = hasattr(os, 'lchmod')
  31. has_lchflags = hasattr(os, 'lchflags')
  32. # Python <= 3.2 raises OSError instead of PermissionError (See #164)
  33. try:
  34. PermissionError = PermissionError
  35. except NameError:
  36. PermissionError = OSError
  37. class DownloadPipeline:
  38. def __init__(self, repository, key):
  39. self.repository = repository
  40. self.key = key
  41. def unpack_many(self, ids, filter=None, preload=False):
  42. unpacker = msgpack.Unpacker(use_list=False)
  43. for data in self.fetch_many(ids):
  44. unpacker.feed(data)
  45. items = [decode_dict(item, (b'path', b'source', b'user', b'group')) for item in unpacker]
  46. if filter:
  47. items = [item for item in items if filter(item)]
  48. if preload:
  49. for item in items:
  50. if b'chunks' in item:
  51. self.repository.preload([c[0] for c in item[b'chunks']])
  52. for item in items:
  53. yield item
  54. def fetch_many(self, ids, is_preloaded=False):
  55. for id_, data in zip(ids, self.repository.get_many(ids, is_preloaded=is_preloaded)):
  56. yield self.key.decrypt(id_, data)
  57. class ChunkBuffer:
  58. BUFFER_SIZE = 1 * 1024 * 1024
  59. def __init__(self, key, chunker_params=CHUNKER_PARAMS):
  60. self.buffer = BytesIO()
  61. self.packer = msgpack.Packer(unicode_errors='surrogateescape')
  62. self.chunks = []
  63. self.key = key
  64. self.chunker = Chunker(self.key.chunk_seed, *chunker_params)
  65. def add(self, item):
  66. self.buffer.write(self.packer.pack(StableDict(item)))
  67. if self.is_full():
  68. self.flush()
  69. def write_chunk(self, chunk):
  70. raise NotImplementedError
  71. def flush(self, flush=False):
  72. if self.buffer.tell() == 0:
  73. return
  74. self.buffer.seek(0)
  75. chunks = list(bytes(s) for s in self.chunker.chunkify(self.buffer))
  76. self.buffer.seek(0)
  77. self.buffer.truncate(0)
  78. # Leave the last partial chunk in the buffer unless flush is True
  79. end = None if flush or len(chunks) == 1 else -1
  80. for chunk in chunks[:end]:
  81. self.chunks.append(self.write_chunk(chunk))
  82. if end == -1:
  83. self.buffer.write(chunks[-1])
  84. def is_full(self):
  85. return self.buffer.tell() > self.BUFFER_SIZE
  86. class CacheChunkBuffer(ChunkBuffer):
  87. def __init__(self, cache, key, stats, chunker_params=CHUNKER_PARAMS):
  88. super().__init__(key, chunker_params)
  89. self.cache = cache
  90. self.stats = stats
  91. def write_chunk(self, chunk):
  92. id_, _, _ = self.cache.add_chunk(self.key.id_hash(chunk), chunk, self.stats)
  93. return id_
  94. class Archive:
  95. class DoesNotExist(Error):
  96. """Archive {} does not exist"""
  97. class AlreadyExists(Error):
  98. """Archive {} already exists"""
  99. class IncompatibleFilesystemEncodingError(Error):
  100. """Failed to encode filename "{}" into file system encoding "{}". Consider configuring the LANG environment variable."""
  101. def __init__(self, repository, key, manifest, name, cache=None, create=False,
  102. checkpoint_interval=300, numeric_owner=False, progress=False,
  103. chunker_params=CHUNKER_PARAMS):
  104. self.cwd = os.getcwd()
  105. self.key = key
  106. self.repository = repository
  107. self.cache = cache
  108. self.manifest = manifest
  109. self.hard_links = {}
  110. self.stats = Statistics()
  111. self.show_progress = progress
  112. self.last_progress = time.time()
  113. self.name = name
  114. self.checkpoint_interval = checkpoint_interval
  115. self.numeric_owner = numeric_owner
  116. self.pipeline = DownloadPipeline(self.repository, self.key)
  117. if create:
  118. self.items_buffer = CacheChunkBuffer(self.cache, self.key, self.stats, chunker_params)
  119. self.chunker = Chunker(self.key.chunk_seed, *chunker_params)
  120. if name in manifest.archives:
  121. raise self.AlreadyExists(name)
  122. self.last_checkpoint = time.time()
  123. i = 0
  124. while True:
  125. self.checkpoint_name = '%s.checkpoint%s' % (name, i and ('.%d' % i) or '')
  126. if self.checkpoint_name not in manifest.archives:
  127. break
  128. i += 1
  129. else:
  130. if name not in self.manifest.archives:
  131. raise self.DoesNotExist(name)
  132. info = self.manifest.archives[name]
  133. self.load(info[b'id'])
  134. self.zeros = b'\0' * (1 << chunker_params[1])
  135. def _load_meta(self, id):
  136. data = self.key.decrypt(id, self.repository.get(id))
  137. metadata = msgpack.unpackb(data)
  138. if metadata[b'version'] != 1:
  139. raise Exception('Unknown archive metadata version')
  140. return metadata
  141. def load(self, id):
  142. self.id = id
  143. self.metadata = self._load_meta(self.id)
  144. decode_dict(self.metadata, (b'name', b'hostname', b'username', b'time'))
  145. self.metadata[b'cmdline'] = [arg.decode('utf-8', 'surrogateescape') for arg in self.metadata[b'cmdline']]
  146. self.name = self.metadata[b'name']
  147. @property
  148. def ts(self):
  149. """Timestamp of archive creation in UTC"""
  150. return parse_timestamp(self.metadata[b'time'])
  151. def __repr__(self):
  152. return 'Archive(%r)' % self.name
  153. def iter_items(self, filter=None, preload=False):
  154. for item in self.pipeline.unpack_many(self.metadata[b'items'], filter=filter, preload=preload):
  155. yield item
  156. def add_item(self, item):
  157. if self.show_progress and time.time() - self.last_progress > 0.2:
  158. self.stats.show_progress(item=item)
  159. self.last_progress = time.time()
  160. self.items_buffer.add(item)
  161. if time.time() - self.last_checkpoint > self.checkpoint_interval:
  162. self.write_checkpoint()
  163. self.last_checkpoint = time.time()
  164. def write_checkpoint(self):
  165. self.save(self.checkpoint_name)
  166. del self.manifest.archives[self.checkpoint_name]
  167. self.cache.chunk_decref(self.id, self.stats)
  168. def save(self, name=None, timestamp=None):
  169. name = name or self.name
  170. if name in self.manifest.archives:
  171. raise self.AlreadyExists(name)
  172. self.items_buffer.flush(flush=True)
  173. if timestamp is None:
  174. timestamp = datetime.utcnow()
  175. metadata = StableDict({
  176. 'version': 1,
  177. 'name': name,
  178. 'items': self.items_buffer.chunks,
  179. 'cmdline': sys.argv,
  180. 'hostname': socket.gethostname(),
  181. 'username': getuser(),
  182. 'time': timestamp.isoformat(),
  183. })
  184. data = msgpack.packb(metadata, unicode_errors='surrogateescape')
  185. self.id = self.key.id_hash(data)
  186. self.cache.add_chunk(self.id, data, self.stats)
  187. self.manifest.archives[name] = {'id': self.id, 'time': metadata['time']}
  188. self.manifest.write()
  189. self.repository.commit()
  190. self.cache.commit()
  191. def calc_stats(self, cache):
  192. def add(id):
  193. count, size, csize = cache.chunks[id]
  194. stats.update(size, csize, count == 1)
  195. cache.chunks[id] = count - 1, size, csize
  196. def add_file_chunks(chunks):
  197. for id, _, _ in chunks:
  198. add(id)
  199. # This function is a bit evil since it abuses the cache to calculate
  200. # the stats. The cache transaction must be rolled back afterwards
  201. unpacker = msgpack.Unpacker(use_list=False)
  202. cache.begin_txn()
  203. stats = Statistics()
  204. add(self.id)
  205. for id, chunk in zip(self.metadata[b'items'], self.repository.get_many(self.metadata[b'items'])):
  206. add(id)
  207. unpacker.feed(self.key.decrypt(id, chunk))
  208. for item in unpacker:
  209. if b'chunks' in item:
  210. stats.nfiles += 1
  211. add_file_chunks(item[b'chunks'])
  212. cache.rollback()
  213. return stats
  214. def extract_item(self, item, restore_attrs=True, dry_run=False, stdout=False, sparse=False):
  215. if dry_run or stdout:
  216. if b'chunks' in item:
  217. for data in self.pipeline.fetch_many([c[0] for c in item[b'chunks']], is_preloaded=True):
  218. if stdout:
  219. sys.stdout.buffer.write(data)
  220. if stdout:
  221. sys.stdout.buffer.flush()
  222. return
  223. dest = self.cwd
  224. if item[b'path'].startswith('/') or item[b'path'].startswith('..'):
  225. raise Exception('Path should be relative and local')
  226. path = os.path.join(dest, item[b'path'])
  227. # Attempt to remove existing files, ignore errors on failure
  228. try:
  229. st = os.lstat(path)
  230. if stat.S_ISDIR(st.st_mode):
  231. os.rmdir(path)
  232. else:
  233. os.unlink(path)
  234. except UnicodeEncodeError:
  235. raise self.IncompatibleFilesystemEncodingError(path, sys.getfilesystemencoding())
  236. except OSError:
  237. pass
  238. mode = item[b'mode']
  239. if stat.S_ISREG(mode):
  240. if not os.path.exists(os.path.dirname(path)):
  241. os.makedirs(os.path.dirname(path))
  242. # Hard link?
  243. if b'source' in item:
  244. source = os.path.join(dest, item[b'source'])
  245. if os.path.exists(path):
  246. os.unlink(path)
  247. os.link(source, path)
  248. else:
  249. with open(path, 'wb') as fd:
  250. ids = [c[0] for c in item[b'chunks']]
  251. for data in self.pipeline.fetch_many(ids, is_preloaded=True):
  252. if sparse and self.zeros.startswith(data):
  253. # all-zero chunk: create a hole in a sparse file
  254. fd.seek(len(data), 1)
  255. else:
  256. fd.write(data)
  257. pos = fd.tell()
  258. fd.truncate(pos)
  259. fd.flush()
  260. self.restore_attrs(path, item, fd=fd.fileno())
  261. elif stat.S_ISDIR(mode):
  262. if not os.path.exists(path):
  263. os.makedirs(path)
  264. if restore_attrs:
  265. self.restore_attrs(path, item)
  266. elif stat.S_ISLNK(mode):
  267. if not os.path.exists(os.path.dirname(path)):
  268. os.makedirs(os.path.dirname(path))
  269. source = item[b'source']
  270. if os.path.exists(path):
  271. os.unlink(path)
  272. os.symlink(source, path)
  273. self.restore_attrs(path, item, symlink=True)
  274. elif stat.S_ISFIFO(mode):
  275. if not os.path.exists(os.path.dirname(path)):
  276. os.makedirs(os.path.dirname(path))
  277. os.mkfifo(path)
  278. self.restore_attrs(path, item)
  279. elif stat.S_ISCHR(mode) or stat.S_ISBLK(mode):
  280. os.mknod(path, item[b'mode'], item[b'rdev'])
  281. self.restore_attrs(path, item)
  282. else:
  283. raise Exception('Unknown archive item type %r' % item[b'mode'])
  284. def restore_attrs(self, path, item, symlink=False, fd=None):
  285. xattrs = item.get(b'xattrs', {})
  286. for k, v in xattrs.items():
  287. try:
  288. xattr.setxattr(fd or path, k, v, follow_symlinks=False)
  289. except OSError as e:
  290. if e.errno not in (errno.ENOTSUP, errno.EACCES, ):
  291. # only raise if the errno is not on our ignore list:
  292. # ENOTSUP == xattrs not supported here
  293. # EACCES == permission denied to set this specific xattr
  294. # (this may happen related to security.* keys)
  295. raise
  296. uid = gid = None
  297. if not self.numeric_owner:
  298. uid = user2uid(item[b'user'])
  299. gid = group2gid(item[b'group'])
  300. uid = item[b'uid'] if uid is None else uid
  301. gid = item[b'gid'] if gid is None else gid
  302. # This code is a bit of a mess due to os specific differences
  303. try:
  304. if fd:
  305. os.fchown(fd, uid, gid)
  306. else:
  307. os.lchown(path, uid, gid)
  308. except OSError:
  309. pass
  310. if fd:
  311. os.fchmod(fd, item[b'mode'])
  312. elif not symlink:
  313. os.chmod(path, item[b'mode'])
  314. elif has_lchmod: # Not available on Linux
  315. os.lchmod(path, item[b'mode'])
  316. mtime = bigint_to_int(item[b'mtime'])
  317. if fd and utime_supports_fd: # Python >= 3.3
  318. os.utime(fd, None, ns=(mtime, mtime))
  319. elif utime_supports_follow_symlinks: # Python >= 3.3
  320. os.utime(path, None, ns=(mtime, mtime), follow_symlinks=False)
  321. elif not symlink:
  322. os.utime(path, (mtime / 1e9, mtime / 1e9))
  323. acl_set(path, item, self.numeric_owner)
  324. # Only available on OS X and FreeBSD
  325. if has_lchflags and b'bsdflags' in item:
  326. try:
  327. os.lchflags(path, item[b'bsdflags'])
  328. except OSError:
  329. pass
  330. def rename(self, name):
  331. if name in self.manifest.archives:
  332. raise self.AlreadyExists(name)
  333. metadata = StableDict(self._load_meta(self.id))
  334. metadata[b'name'] = name
  335. data = msgpack.packb(metadata, unicode_errors='surrogateescape')
  336. new_id = self.key.id_hash(data)
  337. self.cache.add_chunk(new_id, data, self.stats)
  338. self.manifest.archives[name] = {'id': new_id, 'time': metadata[b'time']}
  339. self.cache.chunk_decref(self.id, self.stats)
  340. del self.manifest.archives[self.name]
  341. def delete(self, stats):
  342. unpacker = msgpack.Unpacker(use_list=False)
  343. for items_id, data in zip(self.metadata[b'items'], self.repository.get_many(self.metadata[b'items'])):
  344. unpacker.feed(self.key.decrypt(items_id, data))
  345. self.cache.chunk_decref(items_id, stats)
  346. for item in unpacker:
  347. if b'chunks' in item:
  348. for chunk_id, size, csize in item[b'chunks']:
  349. self.cache.chunk_decref(chunk_id, stats)
  350. self.cache.chunk_decref(self.id, stats)
  351. del self.manifest.archives[self.name]
  352. def stat_attrs(self, st, path):
  353. item = {
  354. b'mode': st.st_mode,
  355. b'uid': st.st_uid, b'user': uid2user(st.st_uid),
  356. b'gid': st.st_gid, b'group': gid2group(st.st_gid),
  357. b'mtime': int_to_bigint(st_mtime_ns(st))
  358. }
  359. if self.numeric_owner:
  360. item[b'user'] = item[b'group'] = None
  361. xattrs = xattr.get_all(path, follow_symlinks=False)
  362. if xattrs:
  363. item[b'xattrs'] = StableDict(xattrs)
  364. if has_lchflags and st.st_flags:
  365. item[b'bsdflags'] = st.st_flags
  366. acl_get(path, item, st, self.numeric_owner)
  367. return item
  368. def process_dir(self, path, st):
  369. item = {b'path': make_path_safe(path)}
  370. item.update(self.stat_attrs(st, path))
  371. self.add_item(item)
  372. return 'd' # directory
  373. def process_fifo(self, path, st):
  374. item = {b'path': make_path_safe(path)}
  375. item.update(self.stat_attrs(st, path))
  376. self.add_item(item)
  377. return 'f' # fifo
  378. def process_dev(self, path, st):
  379. item = {b'path': make_path_safe(path), b'rdev': st.st_rdev}
  380. item.update(self.stat_attrs(st, path))
  381. self.add_item(item)
  382. if stat.S_ISCHR(st.st_mode):
  383. return 'c' # char device
  384. elif stat.S_ISBLK(st.st_mode):
  385. return 'b' # block device
  386. def process_symlink(self, path, st):
  387. source = os.readlink(path)
  388. item = {b'path': make_path_safe(path), b'source': source}
  389. item.update(self.stat_attrs(st, path))
  390. self.add_item(item)
  391. return 's' # symlink
  392. def process_stdin(self, path, cache):
  393. uid, gid = 0, 0
  394. fd = sys.stdin.buffer # binary
  395. chunks = []
  396. for chunk in self.chunker.chunkify(fd):
  397. chunks.append(cache.add_chunk(self.key.id_hash(chunk), chunk, self.stats))
  398. self.stats.nfiles += 1
  399. item = {
  400. b'path': path,
  401. b'chunks': chunks,
  402. b'mode': 0o100660, # regular file, ug=rw
  403. b'uid': uid, b'user': uid2user(uid),
  404. b'gid': gid, b'group': gid2group(gid),
  405. b'mtime': int_to_bigint(int(time.time()) * 1000000000)
  406. }
  407. self.add_item(item)
  408. def process_file(self, path, st, cache):
  409. status = None
  410. safe_path = make_path_safe(path)
  411. # Is it a hard link?
  412. if st.st_nlink > 1:
  413. source = self.hard_links.get((st.st_ino, st.st_dev))
  414. if (st.st_ino, st.st_dev) in self.hard_links:
  415. item = self.stat_attrs(st, path)
  416. item.update({b'path': safe_path, b'source': source})
  417. self.add_item(item)
  418. status = 'h' # regular file, hardlink (to already seen inodes)
  419. return status
  420. else:
  421. self.hard_links[st.st_ino, st.st_dev] = safe_path
  422. path_hash = self.key.id_hash(os.path.join(self.cwd, path).encode('utf-8', 'surrogateescape'))
  423. ids = cache.file_known_and_unchanged(path_hash, st)
  424. chunks = None
  425. if ids is not None:
  426. # Make sure all ids are available
  427. for id_ in ids:
  428. if not cache.seen_chunk(id_):
  429. break
  430. else:
  431. chunks = [cache.chunk_incref(id_, self.stats) for id_ in ids]
  432. status = 'U' # regular file, unchanged
  433. else:
  434. status = 'A' # regular file, added
  435. # Only chunkify the file if needed
  436. if chunks is None:
  437. fh = Archive._open_rb(path, st)
  438. with os.fdopen(fh, 'rb') as fd:
  439. chunks = []
  440. for chunk in self.chunker.chunkify(fd, fh):
  441. chunks.append(cache.add_chunk(self.key.id_hash(chunk), chunk, self.stats))
  442. cache.memorize_file(path_hash, st, [c[0] for c in chunks])
  443. status = status or 'M' # regular file, modified (if not 'A' already)
  444. item = {b'path': safe_path, b'chunks': chunks}
  445. item.update(self.stat_attrs(st, path))
  446. self.stats.nfiles += 1
  447. self.add_item(item)
  448. return status
  449. @staticmethod
  450. def list_archives(repository, key, manifest, cache=None):
  451. # expensive! see also Manifest.list_archive_infos.
  452. for name, info in manifest.archives.items():
  453. yield Archive(repository, key, manifest, name, cache=cache)
  454. @staticmethod
  455. def _open_rb(path, st):
  456. flags_normal = os.O_RDONLY | getattr(os, 'O_BINARY', 0)
  457. flags_noatime = flags_normal | getattr(os, 'NO_ATIME', 0)
  458. euid = None
  459. def open_simple(p, s):
  460. return os.open(p, flags_normal)
  461. def open_noatime(p, s):
  462. return os.open(p, flags_noatime)
  463. def open_noatime_if_owner(p, s):
  464. if euid == 0 or s.st_uid == euid:
  465. # we are root or owner of file
  466. return open_noatime(p, s)
  467. else:
  468. return open_simple(p, s)
  469. def open_noatime_with_fallback(p, s):
  470. try:
  471. fd = os.open(p, flags_noatime)
  472. except PermissionError:
  473. # Was this EPERM due to the O_NOATIME flag?
  474. fd = os.open(p, flags_normal)
  475. # Yes, it was -- otherwise the above line would have thrown
  476. # another exception.
  477. nonlocal euid
  478. euid = os.geteuid()
  479. # So in future, let's check whether the file is owned by us
  480. # before attempting to use O_NOATIME.
  481. Archive._open_rb = open_noatime_if_owner
  482. return fd
  483. if flags_noatime != flags_normal:
  484. # Always use O_NOATIME version.
  485. Archive._open_rb = open_noatime_with_fallback
  486. else:
  487. # Always use non-O_NOATIME version.
  488. Archive._open_rb = open_simple
  489. return Archive._open_rb(path, st)
  490. class RobustUnpacker:
  491. """A restartable/robust version of the streaming msgpack unpacker
  492. """
  493. item_keys = [msgpack.packb(name) for name in ('path', 'mode', 'source', 'chunks', 'rdev', 'xattrs', 'user', 'group', 'uid', 'gid', 'mtime')]
  494. def __init__(self, validator):
  495. super().__init__()
  496. self.validator = validator
  497. self._buffered_data = []
  498. self._resync = False
  499. self._unpacker = msgpack.Unpacker(object_hook=StableDict)
  500. def resync(self):
  501. self._buffered_data = []
  502. self._resync = True
  503. def feed(self, data):
  504. if self._resync:
  505. self._buffered_data.append(data)
  506. else:
  507. self._unpacker.feed(data)
  508. def __iter__(self):
  509. return self
  510. def __next__(self):
  511. if self._resync:
  512. data = b''.join(self._buffered_data)
  513. while self._resync:
  514. if not data:
  515. raise StopIteration
  516. # Abort early if the data does not look like a serialized dict
  517. if len(data) < 2 or ((data[0] & 0xf0) != 0x80) or ((data[1] & 0xe0) != 0xa0):
  518. data = data[1:]
  519. continue
  520. # Make sure it looks like an item dict
  521. for pattern in self.item_keys:
  522. if data[1:].startswith(pattern):
  523. break
  524. else:
  525. data = data[1:]
  526. continue
  527. self._unpacker = msgpack.Unpacker(object_hook=StableDict)
  528. self._unpacker.feed(data)
  529. try:
  530. item = next(self._unpacker)
  531. if self.validator(item):
  532. self._resync = False
  533. return item
  534. # Ignore exceptions that might be raised when feeding
  535. # msgpack with invalid data
  536. except (TypeError, ValueError, StopIteration):
  537. pass
  538. data = data[1:]
  539. else:
  540. return next(self._unpacker)
  541. class ArchiveChecker:
  542. def __init__(self):
  543. self.error_found = False
  544. self.possibly_superseded = set()
  545. def check(self, repository, repair=False, archive=None, last=None):
  546. self.report_progress('Starting archive consistency check...')
  547. self.check_all = archive is None and last is None
  548. self.repair = repair
  549. self.repository = repository
  550. self.init_chunks()
  551. self.key = self.identify_key(repository)
  552. if Manifest.MANIFEST_ID not in self.chunks:
  553. self.manifest = self.rebuild_manifest()
  554. else:
  555. self.manifest, _ = Manifest.load(repository, key=self.key)
  556. self.rebuild_refcounts(archive=archive, last=last)
  557. self.orphan_chunks_check()
  558. self.finish()
  559. if not self.error_found:
  560. self.report_progress('Archive consistency check complete, no problems found.')
  561. return self.repair or not self.error_found
  562. def init_chunks(self):
  563. """Fetch a list of all object keys from repository
  564. """
  565. # Explicitly set the initial hash table capacity to avoid performance issues
  566. # due to hash table "resonance"
  567. capacity = int(len(self.repository) * 1.2)
  568. self.chunks = ChunkIndex(capacity)
  569. marker = None
  570. while True:
  571. result = self.repository.list(limit=10000, marker=marker)
  572. if not result:
  573. break
  574. marker = result[-1]
  575. for id_ in result:
  576. self.chunks[id_] = (0, 0, 0)
  577. def report_progress(self, msg, error=False):
  578. if error:
  579. self.error_found = True
  580. print(msg, file=sys.stderr if error else sys.stdout)
  581. def identify_key(self, repository):
  582. cdata = repository.get(next(self.chunks.iteritems())[0])
  583. return key_factory(repository, cdata)
  584. def rebuild_manifest(self):
  585. """Rebuild the manifest object if it is missing
  586. Iterates through all objects in the repository looking for archive metadata blocks.
  587. """
  588. self.report_progress('Rebuilding missing manifest, this might take some time...', error=True)
  589. manifest = Manifest(self.key, self.repository)
  590. for chunk_id, _ in self.chunks.iteritems():
  591. cdata = self.repository.get(chunk_id)
  592. data = self.key.decrypt(chunk_id, cdata)
  593. # Some basic sanity checks of the payload before feeding it into msgpack
  594. if len(data) < 2 or ((data[0] & 0xf0) != 0x80) or ((data[1] & 0xe0) != 0xa0):
  595. continue
  596. if b'cmdline' not in data or b'\xa7version\x01' not in data:
  597. continue
  598. try:
  599. archive = msgpack.unpackb(data)
  600. # Ignore exceptions that might be raised when feeding
  601. # msgpack with invalid data
  602. except (TypeError, ValueError, StopIteration):
  603. continue
  604. if isinstance(archive, dict) and b'items' in archive and b'cmdline' in archive:
  605. self.report_progress('Found archive ' + archive[b'name'].decode('utf-8'), error=True)
  606. manifest.archives[archive[b'name'].decode('utf-8')] = {b'id': chunk_id, b'time': archive[b'time']}
  607. self.report_progress('Manifest rebuild complete', error=True)
  608. return manifest
  609. def rebuild_refcounts(self, archive=None, last=None):
  610. """Rebuild object reference counts by walking the metadata
  611. Missing and/or incorrect data is repaired when detected
  612. """
  613. # Exclude the manifest from chunks
  614. del self.chunks[Manifest.MANIFEST_ID]
  615. def mark_as_possibly_superseded(id_):
  616. if self.chunks.get(id_, (0,))[0] == 0:
  617. self.possibly_superseded.add(id_)
  618. def add_callback(chunk):
  619. id_ = self.key.id_hash(chunk)
  620. cdata = self.key.encrypt(chunk)
  621. add_reference(id_, len(chunk), len(cdata), cdata)
  622. return id_
  623. def add_reference(id_, size, csize, cdata=None):
  624. try:
  625. count, _, _ = self.chunks[id_]
  626. self.chunks[id_] = count + 1, size, csize
  627. except KeyError:
  628. assert cdata is not None
  629. self.chunks[id_] = 1, size, csize
  630. if self.repair:
  631. self.repository.put(id_, cdata)
  632. def verify_file_chunks(item):
  633. """Verifies that all file chunks are present
  634. Missing file chunks will be replaced with new chunks of the same
  635. length containing all zeros.
  636. """
  637. offset = 0
  638. chunk_list = []
  639. for chunk_id, size, csize in item[b'chunks']:
  640. if chunk_id not in self.chunks:
  641. # If a file chunk is missing, create an all empty replacement chunk
  642. self.report_progress('{}: Missing file chunk detected (Byte {}-{})'.format(item[b'path'].decode('utf-8', 'surrogateescape'), offset, offset + size), error=True)
  643. data = bytes(size)
  644. chunk_id = self.key.id_hash(data)
  645. cdata = self.key.encrypt(data)
  646. csize = len(cdata)
  647. add_reference(chunk_id, size, csize, cdata)
  648. else:
  649. add_reference(chunk_id, size, csize)
  650. chunk_list.append((chunk_id, size, csize))
  651. offset += size
  652. item[b'chunks'] = chunk_list
  653. def robust_iterator(archive):
  654. """Iterates through all archive items
  655. Missing item chunks will be skipped and the msgpack stream will be restarted
  656. """
  657. unpacker = RobustUnpacker(lambda item: isinstance(item, dict) and b'path' in item)
  658. _state = 0
  659. def missing_chunk_detector(chunk_id):
  660. nonlocal _state
  661. if _state % 2 != int(chunk_id not in self.chunks):
  662. _state += 1
  663. return _state
  664. for state, items in groupby(archive[b'items'], missing_chunk_detector):
  665. items = list(items)
  666. if state % 2:
  667. self.report_progress('Archive metadata damage detected', error=True)
  668. continue
  669. if state > 0:
  670. unpacker.resync()
  671. for chunk_id, cdata in zip(items, repository.get_many(items)):
  672. unpacker.feed(self.key.decrypt(chunk_id, cdata))
  673. for item in unpacker:
  674. if not isinstance(item, dict):
  675. self.report_progress('Did not get expected metadata dict - archive corrupted!',
  676. error=True)
  677. continue
  678. yield item
  679. repository = cache_if_remote(self.repository)
  680. if archive is None:
  681. # we need last N or all archives
  682. archive_items = sorted(self.manifest.archives.items(), reverse=True,
  683. key=lambda name_info: name_info[1][b'time'])
  684. num_archives = len(self.manifest.archives)
  685. end = None if last is None else min(num_archives, last)
  686. else:
  687. # we only want one specific archive
  688. archive_items = [item for item in self.manifest.archives.items() if item[0] == archive]
  689. num_archives = 1
  690. end = 1
  691. for i, (name, info) in enumerate(archive_items[:end]):
  692. self.report_progress('Analyzing archive {} ({}/{})'.format(name, num_archives - i, num_archives))
  693. archive_id = info[b'id']
  694. if archive_id not in self.chunks:
  695. self.report_progress('Archive metadata block is missing', error=True)
  696. del self.manifest.archives[name]
  697. continue
  698. mark_as_possibly_superseded(archive_id)
  699. cdata = self.repository.get(archive_id)
  700. data = self.key.decrypt(archive_id, cdata)
  701. archive = StableDict(msgpack.unpackb(data))
  702. if archive[b'version'] != 1:
  703. raise Exception('Unknown archive metadata version')
  704. decode_dict(archive, (b'name', b'hostname', b'username', b'time')) # fixme: argv
  705. items_buffer = ChunkBuffer(self.key)
  706. items_buffer.write_chunk = add_callback
  707. for item in robust_iterator(archive):
  708. if b'chunks' in item:
  709. verify_file_chunks(item)
  710. items_buffer.add(item)
  711. items_buffer.flush(flush=True)
  712. for previous_item_id in archive[b'items']:
  713. mark_as_possibly_superseded(previous_item_id)
  714. archive[b'items'] = items_buffer.chunks
  715. data = msgpack.packb(archive, unicode_errors='surrogateescape')
  716. new_archive_id = self.key.id_hash(data)
  717. cdata = self.key.encrypt(data)
  718. add_reference(new_archive_id, len(data), len(cdata), cdata)
  719. info[b'id'] = new_archive_id
  720. def orphan_chunks_check(self):
  721. if self.check_all:
  722. unused = set()
  723. for id_, (count, size, csize) in self.chunks.iteritems():
  724. if count == 0:
  725. unused.add(id_)
  726. orphaned = unused - self.possibly_superseded
  727. if orphaned:
  728. self.report_progress('{} orphaned objects found'.format(len(orphaned)), error=True)
  729. if self.repair:
  730. for id_ in unused:
  731. self.repository.delete(id_)
  732. else:
  733. self.report_progress('Orphaned objects check skipped (needs all archives checked)')
  734. def finish(self):
  735. if self.repair:
  736. self.manifest.write()
  737. self.repository.commit()