archive.py 33 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860
  1. from binascii import hexlify
  2. from datetime import datetime
  3. from getpass import getuser
  4. from itertools import groupby
  5. import errno
  6. import logging
  7. from .logger import create_logger
  8. logger = create_logger()
  9. from .key import key_factory
  10. from .remote import cache_if_remote
  11. import os
  12. import socket
  13. import stat
  14. import sys
  15. import time
  16. from io import BytesIO
  17. from . import xattr
  18. from .helpers import parse_timestamp, Error, uid2user, user2uid, gid2group, group2gid, format_timedelta, \
  19. Manifest, Statistics, decode_dict, st_mtime_ns, make_path_safe, StableDict, int_to_bigint, bigint_to_int, have_cython
  20. if have_cython():
  21. from .platform import acl_get, acl_set
  22. from .chunker import Chunker
  23. from .hashindex import ChunkIndex
  24. import msgpack
  25. else:
  26. import mock
  27. msgpack = mock.Mock()
  28. ITEMS_BUFFER = 1024 * 1024
  29. CHUNK_MIN_EXP = 10 # 2**10 == 1kiB
  30. CHUNK_MAX_EXP = 23 # 2**23 == 8MiB
  31. HASH_WINDOW_SIZE = 0xfff # 4095B
  32. HASH_MASK_BITS = 16 # results in ~64kiB chunks statistically
  33. # defaults, use --chunker-params to override
  34. CHUNKER_PARAMS = (CHUNK_MIN_EXP, CHUNK_MAX_EXP, HASH_MASK_BITS, HASH_WINDOW_SIZE)
  35. utime_supports_fd = os.utime in getattr(os, 'supports_fd', {})
  36. utime_supports_follow_symlinks = os.utime in getattr(os, 'supports_follow_symlinks', {})
  37. has_mtime_ns = sys.version >= '3.3'
  38. has_lchmod = hasattr(os, 'lchmod')
  39. has_lchflags = hasattr(os, 'lchflags')
  40. # Python <= 3.2 raises OSError instead of PermissionError (See #164)
  41. try:
  42. PermissionError = PermissionError
  43. except NameError:
  44. PermissionError = OSError
  45. class DownloadPipeline:
  46. def __init__(self, repository, key):
  47. self.repository = repository
  48. self.key = key
  49. def unpack_many(self, ids, filter=None, preload=False):
  50. unpacker = msgpack.Unpacker(use_list=False)
  51. for data in self.fetch_many(ids):
  52. unpacker.feed(data)
  53. items = [decode_dict(item, (b'path', b'source', b'user', b'group')) for item in unpacker]
  54. if filter:
  55. items = [item for item in items if filter(item)]
  56. if preload:
  57. for item in items:
  58. if b'chunks' in item:
  59. self.repository.preload([c[0] for c in item[b'chunks']])
  60. for item in items:
  61. yield item
  62. def fetch_many(self, ids, is_preloaded=False):
  63. for id_, data in zip(ids, self.repository.get_many(ids, is_preloaded=is_preloaded)):
  64. yield self.key.decrypt(id_, data)
  65. class ChunkBuffer:
  66. BUFFER_SIZE = 1 * 1024 * 1024
  67. def __init__(self, key, chunker_params=CHUNKER_PARAMS):
  68. self.buffer = BytesIO()
  69. self.packer = msgpack.Packer(unicode_errors='surrogateescape')
  70. self.chunks = []
  71. self.key = key
  72. self.chunker = Chunker(self.key.chunk_seed, *chunker_params)
  73. def add(self, item):
  74. self.buffer.write(self.packer.pack(StableDict(item)))
  75. if self.is_full():
  76. self.flush()
  77. def write_chunk(self, chunk):
  78. raise NotImplementedError
  79. def flush(self, flush=False):
  80. if self.buffer.tell() == 0:
  81. return
  82. self.buffer.seek(0)
  83. chunks = list(bytes(s) for s in self.chunker.chunkify(self.buffer))
  84. self.buffer.seek(0)
  85. self.buffer.truncate(0)
  86. # Leave the last partial chunk in the buffer unless flush is True
  87. end = None if flush or len(chunks) == 1 else -1
  88. for chunk in chunks[:end]:
  89. self.chunks.append(self.write_chunk(chunk))
  90. if end == -1:
  91. self.buffer.write(chunks[-1])
  92. def is_full(self):
  93. return self.buffer.tell() > self.BUFFER_SIZE
  94. class CacheChunkBuffer(ChunkBuffer):
  95. def __init__(self, cache, key, stats, chunker_params=CHUNKER_PARAMS):
  96. super().__init__(key, chunker_params)
  97. self.cache = cache
  98. self.stats = stats
  99. def write_chunk(self, chunk):
  100. id_, _, _ = self.cache.add_chunk(self.key.id_hash(chunk), chunk, self.stats)
  101. return id_
  102. class Archive:
  103. class DoesNotExist(Error):
  104. """Archive {} does not exist"""
  105. class AlreadyExists(Error):
  106. """Archive {} already exists"""
  107. class IncompatibleFilesystemEncodingError(Error):
  108. """Failed to encode filename "{}" into file system encoding "{}". Consider configuring the LANG environment variable."""
  109. def __init__(self, repository, key, manifest, name, cache=None, create=False,
  110. checkpoint_interval=300, numeric_owner=False, progress=False,
  111. chunker_params=CHUNKER_PARAMS,
  112. start=datetime.now(), end=datetime.now()):
  113. self.cwd = os.getcwd()
  114. self.key = key
  115. self.repository = repository
  116. self.cache = cache
  117. self.manifest = manifest
  118. self.hard_links = {}
  119. self.stats = Statistics()
  120. self.show_progress = progress
  121. self.last_progress = time.time()
  122. self.name = name
  123. self.checkpoint_interval = checkpoint_interval
  124. self.numeric_owner = numeric_owner
  125. self.start = start
  126. self.end = end
  127. self.pipeline = DownloadPipeline(self.repository, self.key)
  128. if create:
  129. self.items_buffer = CacheChunkBuffer(self.cache, self.key, self.stats, chunker_params)
  130. self.chunker = Chunker(self.key.chunk_seed, *chunker_params)
  131. if name in manifest.archives:
  132. raise self.AlreadyExists(name)
  133. self.last_checkpoint = time.time()
  134. i = 0
  135. while True:
  136. self.checkpoint_name = '%s.checkpoint%s' % (name, i and ('.%d' % i) or '')
  137. if self.checkpoint_name not in manifest.archives:
  138. break
  139. i += 1
  140. else:
  141. if name not in self.manifest.archives:
  142. raise self.DoesNotExist(name)
  143. info = self.manifest.archives[name]
  144. self.load(info[b'id'])
  145. self.zeros = b'\0' * (1 << chunker_params[1])
  146. def _load_meta(self, id):
  147. data = self.key.decrypt(id, self.repository.get(id))
  148. metadata = msgpack.unpackb(data)
  149. if metadata[b'version'] != 1:
  150. raise Exception('Unknown archive metadata version')
  151. return metadata
  152. def load(self, id):
  153. self.id = id
  154. self.metadata = self._load_meta(self.id)
  155. decode_dict(self.metadata, (b'name', b'hostname', b'username', b'time'))
  156. self.metadata[b'cmdline'] = [arg.decode('utf-8', 'surrogateescape') for arg in self.metadata[b'cmdline']]
  157. self.name = self.metadata[b'name']
  158. @property
  159. def ts(self):
  160. """Timestamp of archive creation in UTC"""
  161. return parse_timestamp(self.metadata[b'time'])
  162. @property
  163. def fpr(self):
  164. return hexlify(self.id).decode('ascii')
  165. @property
  166. def duration(self):
  167. return format_timedelta(self.end-self.start)
  168. def __str__(self):
  169. return '''Archive name: {0.name}
  170. Archive fingerprint: {0.fpr}
  171. Start time: {0.start:%c}
  172. End time: {0.end:%c}
  173. Duration: {0.duration}
  174. Number of files: {0.stats.nfiles}'''.format(self)
  175. def __repr__(self):
  176. return 'Archive(%r)' % self.name
  177. def iter_items(self, filter=None, preload=False):
  178. for item in self.pipeline.unpack_many(self.metadata[b'items'], filter=filter, preload=preload):
  179. yield item
  180. def add_item(self, item):
  181. if self.show_progress and time.time() - self.last_progress > 0.2:
  182. self.stats.show_progress(item=item)
  183. self.last_progress = time.time()
  184. self.items_buffer.add(item)
  185. if time.time() - self.last_checkpoint > self.checkpoint_interval:
  186. self.write_checkpoint()
  187. self.last_checkpoint = time.time()
  188. def write_checkpoint(self):
  189. self.save(self.checkpoint_name)
  190. del self.manifest.archives[self.checkpoint_name]
  191. self.cache.chunk_decref(self.id, self.stats)
  192. def save(self, name=None, timestamp=None):
  193. name = name or self.name
  194. if name in self.manifest.archives:
  195. raise self.AlreadyExists(name)
  196. self.items_buffer.flush(flush=True)
  197. if timestamp is None:
  198. timestamp = datetime.utcnow()
  199. metadata = StableDict({
  200. 'version': 1,
  201. 'name': name,
  202. 'items': self.items_buffer.chunks,
  203. 'cmdline': sys.argv,
  204. 'hostname': socket.gethostname(),
  205. 'username': getuser(),
  206. 'time': timestamp.isoformat(),
  207. })
  208. data = msgpack.packb(metadata, unicode_errors='surrogateescape')
  209. self.id = self.key.id_hash(data)
  210. self.cache.add_chunk(self.id, data, self.stats)
  211. self.manifest.archives[name] = {'id': self.id, 'time': metadata['time']}
  212. self.manifest.write()
  213. self.repository.commit()
  214. self.cache.commit()
  215. def calc_stats(self, cache):
  216. def add(id):
  217. count, size, csize = cache.chunks[id]
  218. stats.update(size, csize, count == 1)
  219. cache.chunks[id] = count - 1, size, csize
  220. def add_file_chunks(chunks):
  221. for id, _, _ in chunks:
  222. add(id)
  223. # This function is a bit evil since it abuses the cache to calculate
  224. # the stats. The cache transaction must be rolled back afterwards
  225. unpacker = msgpack.Unpacker(use_list=False)
  226. cache.begin_txn()
  227. stats = Statistics()
  228. add(self.id)
  229. for id, chunk in zip(self.metadata[b'items'], self.repository.get_many(self.metadata[b'items'])):
  230. add(id)
  231. unpacker.feed(self.key.decrypt(id, chunk))
  232. for item in unpacker:
  233. if b'chunks' in item:
  234. stats.nfiles += 1
  235. add_file_chunks(item[b'chunks'])
  236. cache.rollback()
  237. return stats
  238. def extract_item(self, item, restore_attrs=True, dry_run=False, stdout=False, sparse=False):
  239. if dry_run or stdout:
  240. if b'chunks' in item:
  241. for data in self.pipeline.fetch_many([c[0] for c in item[b'chunks']], is_preloaded=True):
  242. if stdout:
  243. sys.stdout.buffer.write(data)
  244. if stdout:
  245. sys.stdout.buffer.flush()
  246. return
  247. dest = self.cwd
  248. if item[b'path'].startswith('/') or item[b'path'].startswith('..'):
  249. raise Exception('Path should be relative and local')
  250. path = os.path.join(dest, item[b'path'])
  251. # Attempt to remove existing files, ignore errors on failure
  252. try:
  253. st = os.lstat(path)
  254. if stat.S_ISDIR(st.st_mode):
  255. os.rmdir(path)
  256. else:
  257. os.unlink(path)
  258. except UnicodeEncodeError:
  259. raise self.IncompatibleFilesystemEncodingError(path, sys.getfilesystemencoding())
  260. except OSError:
  261. pass
  262. mode = item[b'mode']
  263. if stat.S_ISREG(mode):
  264. if not os.path.exists(os.path.dirname(path)):
  265. os.makedirs(os.path.dirname(path))
  266. # Hard link?
  267. if b'source' in item:
  268. source = os.path.join(dest, item[b'source'])
  269. if os.path.exists(path):
  270. os.unlink(path)
  271. os.link(source, path)
  272. else:
  273. with open(path, 'wb') as fd:
  274. ids = [c[0] for c in item[b'chunks']]
  275. for data in self.pipeline.fetch_many(ids, is_preloaded=True):
  276. if sparse and self.zeros.startswith(data):
  277. # all-zero chunk: create a hole in a sparse file
  278. fd.seek(len(data), 1)
  279. else:
  280. fd.write(data)
  281. pos = fd.tell()
  282. fd.truncate(pos)
  283. fd.flush()
  284. self.restore_attrs(path, item, fd=fd.fileno())
  285. elif stat.S_ISDIR(mode):
  286. if not os.path.exists(path):
  287. os.makedirs(path)
  288. if restore_attrs:
  289. self.restore_attrs(path, item)
  290. elif stat.S_ISLNK(mode):
  291. if not os.path.exists(os.path.dirname(path)):
  292. os.makedirs(os.path.dirname(path))
  293. source = item[b'source']
  294. if os.path.exists(path):
  295. os.unlink(path)
  296. os.symlink(source, path)
  297. self.restore_attrs(path, item, symlink=True)
  298. elif stat.S_ISFIFO(mode):
  299. if not os.path.exists(os.path.dirname(path)):
  300. os.makedirs(os.path.dirname(path))
  301. os.mkfifo(path)
  302. self.restore_attrs(path, item)
  303. elif stat.S_ISCHR(mode) or stat.S_ISBLK(mode):
  304. os.mknod(path, item[b'mode'], item[b'rdev'])
  305. self.restore_attrs(path, item)
  306. else:
  307. raise Exception('Unknown archive item type %r' % item[b'mode'])
  308. def restore_attrs(self, path, item, symlink=False, fd=None):
  309. xattrs = item.get(b'xattrs', {})
  310. for k, v in xattrs.items():
  311. try:
  312. xattr.setxattr(fd or path, k, v, follow_symlinks=False)
  313. except OSError as e:
  314. if e.errno not in (errno.ENOTSUP, errno.EACCES, ):
  315. # only raise if the errno is not on our ignore list:
  316. # ENOTSUP == xattrs not supported here
  317. # EACCES == permission denied to set this specific xattr
  318. # (this may happen related to security.* keys)
  319. raise
  320. uid = gid = None
  321. if not self.numeric_owner:
  322. uid = user2uid(item[b'user'])
  323. gid = group2gid(item[b'group'])
  324. uid = item[b'uid'] if uid is None else uid
  325. gid = item[b'gid'] if gid is None else gid
  326. # This code is a bit of a mess due to os specific differences
  327. try:
  328. if fd:
  329. os.fchown(fd, uid, gid)
  330. else:
  331. os.lchown(path, uid, gid)
  332. except OSError:
  333. pass
  334. if fd:
  335. os.fchmod(fd, item[b'mode'])
  336. elif not symlink:
  337. os.chmod(path, item[b'mode'])
  338. elif has_lchmod: # Not available on Linux
  339. os.lchmod(path, item[b'mode'])
  340. mtime = bigint_to_int(item[b'mtime'])
  341. if fd and utime_supports_fd: # Python >= 3.3
  342. os.utime(fd, None, ns=(mtime, mtime))
  343. elif utime_supports_follow_symlinks: # Python >= 3.3
  344. os.utime(path, None, ns=(mtime, mtime), follow_symlinks=False)
  345. elif not symlink:
  346. os.utime(path, (mtime / 1e9, mtime / 1e9))
  347. acl_set(path, item, self.numeric_owner)
  348. # Only available on OS X and FreeBSD
  349. if has_lchflags and b'bsdflags' in item:
  350. try:
  351. os.lchflags(path, item[b'bsdflags'])
  352. except OSError:
  353. pass
  354. def rename(self, name):
  355. if name in self.manifest.archives:
  356. raise self.AlreadyExists(name)
  357. metadata = StableDict(self._load_meta(self.id))
  358. metadata[b'name'] = name
  359. data = msgpack.packb(metadata, unicode_errors='surrogateescape')
  360. new_id = self.key.id_hash(data)
  361. self.cache.add_chunk(new_id, data, self.stats)
  362. self.manifest.archives[name] = {'id': new_id, 'time': metadata[b'time']}
  363. self.cache.chunk_decref(self.id, self.stats)
  364. del self.manifest.archives[self.name]
  365. def delete(self, stats):
  366. unpacker = msgpack.Unpacker(use_list=False)
  367. for items_id, data in zip(self.metadata[b'items'], self.repository.get_many(self.metadata[b'items'])):
  368. unpacker.feed(self.key.decrypt(items_id, data))
  369. self.cache.chunk_decref(items_id, stats)
  370. for item in unpacker:
  371. if b'chunks' in item:
  372. for chunk_id, size, csize in item[b'chunks']:
  373. self.cache.chunk_decref(chunk_id, stats)
  374. self.cache.chunk_decref(self.id, stats)
  375. del self.manifest.archives[self.name]
  376. def stat_attrs(self, st, path):
  377. item = {
  378. b'mode': st.st_mode,
  379. b'uid': st.st_uid, b'user': uid2user(st.st_uid),
  380. b'gid': st.st_gid, b'group': gid2group(st.st_gid),
  381. b'mtime': int_to_bigint(st_mtime_ns(st))
  382. }
  383. if self.numeric_owner:
  384. item[b'user'] = item[b'group'] = None
  385. xattrs = xattr.get_all(path, follow_symlinks=False)
  386. if xattrs:
  387. item[b'xattrs'] = StableDict(xattrs)
  388. if has_lchflags and st.st_flags:
  389. item[b'bsdflags'] = st.st_flags
  390. acl_get(path, item, st, self.numeric_owner)
  391. return item
  392. def process_dir(self, path, st):
  393. item = {b'path': make_path_safe(path)}
  394. item.update(self.stat_attrs(st, path))
  395. self.add_item(item)
  396. return 'd' # directory
  397. def process_fifo(self, path, st):
  398. item = {b'path': make_path_safe(path)}
  399. item.update(self.stat_attrs(st, path))
  400. self.add_item(item)
  401. return 'f' # fifo
  402. def process_dev(self, path, st):
  403. item = {b'path': make_path_safe(path), b'rdev': st.st_rdev}
  404. item.update(self.stat_attrs(st, path))
  405. self.add_item(item)
  406. if stat.S_ISCHR(st.st_mode):
  407. return 'c' # char device
  408. elif stat.S_ISBLK(st.st_mode):
  409. return 'b' # block device
  410. def process_symlink(self, path, st):
  411. source = os.readlink(path)
  412. item = {b'path': make_path_safe(path), b'source': source}
  413. item.update(self.stat_attrs(st, path))
  414. self.add_item(item)
  415. return 's' # symlink
  416. def process_stdin(self, path, cache):
  417. uid, gid = 0, 0
  418. fd = sys.stdin.buffer # binary
  419. chunks = []
  420. for chunk in self.chunker.chunkify(fd):
  421. chunks.append(cache.add_chunk(self.key.id_hash(chunk), chunk, self.stats))
  422. self.stats.nfiles += 1
  423. item = {
  424. b'path': path,
  425. b'chunks': chunks,
  426. b'mode': 0o100660, # regular file, ug=rw
  427. b'uid': uid, b'user': uid2user(uid),
  428. b'gid': gid, b'group': gid2group(gid),
  429. b'mtime': int_to_bigint(int(time.time()) * 1000000000)
  430. }
  431. self.add_item(item)
  432. return 'i' # stdin
  433. def process_file(self, path, st, cache):
  434. status = None
  435. safe_path = make_path_safe(path)
  436. # Is it a hard link?
  437. if st.st_nlink > 1:
  438. source = self.hard_links.get((st.st_ino, st.st_dev))
  439. if (st.st_ino, st.st_dev) in self.hard_links:
  440. item = self.stat_attrs(st, path)
  441. item.update({b'path': safe_path, b'source': source})
  442. self.add_item(item)
  443. status = 'h' # regular file, hardlink (to already seen inodes)
  444. return status
  445. else:
  446. self.hard_links[st.st_ino, st.st_dev] = safe_path
  447. path_hash = self.key.id_hash(os.path.join(self.cwd, path).encode('utf-8', 'surrogateescape'))
  448. first_run = not cache.files
  449. ids = cache.file_known_and_unchanged(path_hash, st)
  450. if first_run:
  451. logger.info('processing files')
  452. chunks = None
  453. if ids is not None:
  454. # Make sure all ids are available
  455. for id_ in ids:
  456. if not cache.seen_chunk(id_):
  457. break
  458. else:
  459. chunks = [cache.chunk_incref(id_, self.stats) for id_ in ids]
  460. status = 'U' # regular file, unchanged
  461. else:
  462. status = 'A' # regular file, added
  463. # Only chunkify the file if needed
  464. if chunks is None:
  465. fh = Archive._open_rb(path, st)
  466. with os.fdopen(fh, 'rb') as fd:
  467. chunks = []
  468. for chunk in self.chunker.chunkify(fd, fh):
  469. chunks.append(cache.add_chunk(self.key.id_hash(chunk), chunk, self.stats))
  470. cache.memorize_file(path_hash, st, [c[0] for c in chunks])
  471. status = status or 'M' # regular file, modified (if not 'A' already)
  472. item = {b'path': safe_path, b'chunks': chunks}
  473. item.update(self.stat_attrs(st, path))
  474. self.stats.nfiles += 1
  475. self.add_item(item)
  476. return status
  477. @staticmethod
  478. def list_archives(repository, key, manifest, cache=None):
  479. # expensive! see also Manifest.list_archive_infos.
  480. for name, info in manifest.archives.items():
  481. yield Archive(repository, key, manifest, name, cache=cache)
  482. @staticmethod
  483. def _open_rb(path, st):
  484. flags_normal = os.O_RDONLY | getattr(os, 'O_BINARY', 0)
  485. flags_noatime = flags_normal | getattr(os, 'NO_ATIME', 0)
  486. euid = None
  487. def open_simple(p, s):
  488. return os.open(p, flags_normal)
  489. def open_noatime(p, s):
  490. return os.open(p, flags_noatime)
  491. def open_noatime_if_owner(p, s):
  492. if euid == 0 or s.st_uid == euid:
  493. # we are root or owner of file
  494. return open_noatime(p, s)
  495. else:
  496. return open_simple(p, s)
  497. def open_noatime_with_fallback(p, s):
  498. try:
  499. fd = os.open(p, flags_noatime)
  500. except PermissionError:
  501. # Was this EPERM due to the O_NOATIME flag?
  502. fd = os.open(p, flags_normal)
  503. # Yes, it was -- otherwise the above line would have thrown
  504. # another exception.
  505. nonlocal euid
  506. euid = os.geteuid()
  507. # So in future, let's check whether the file is owned by us
  508. # before attempting to use O_NOATIME.
  509. Archive._open_rb = open_noatime_if_owner
  510. return fd
  511. if flags_noatime != flags_normal:
  512. # Always use O_NOATIME version.
  513. Archive._open_rb = open_noatime_with_fallback
  514. else:
  515. # Always use non-O_NOATIME version.
  516. Archive._open_rb = open_simple
  517. return Archive._open_rb(path, st)
  518. class RobustUnpacker:
  519. """A restartable/robust version of the streaming msgpack unpacker
  520. """
  521. item_keys = [msgpack.packb(name) for name in ('path', 'mode', 'source', 'chunks', 'rdev', 'xattrs', 'user', 'group', 'uid', 'gid', 'mtime')]
  522. def __init__(self, validator):
  523. super().__init__()
  524. self.validator = validator
  525. self._buffered_data = []
  526. self._resync = False
  527. self._unpacker = msgpack.Unpacker(object_hook=StableDict)
  528. def resync(self):
  529. self._buffered_data = []
  530. self._resync = True
  531. def feed(self, data):
  532. if self._resync:
  533. self._buffered_data.append(data)
  534. else:
  535. self._unpacker.feed(data)
  536. def __iter__(self):
  537. return self
  538. def __next__(self):
  539. if self._resync:
  540. data = b''.join(self._buffered_data)
  541. while self._resync:
  542. if not data:
  543. raise StopIteration
  544. # Abort early if the data does not look like a serialized dict
  545. if len(data) < 2 or ((data[0] & 0xf0) != 0x80) or ((data[1] & 0xe0) != 0xa0):
  546. data = data[1:]
  547. continue
  548. # Make sure it looks like an item dict
  549. for pattern in self.item_keys:
  550. if data[1:].startswith(pattern):
  551. break
  552. else:
  553. data = data[1:]
  554. continue
  555. self._unpacker = msgpack.Unpacker(object_hook=StableDict)
  556. self._unpacker.feed(data)
  557. try:
  558. item = next(self._unpacker)
  559. if self.validator(item):
  560. self._resync = False
  561. return item
  562. # Ignore exceptions that might be raised when feeding
  563. # msgpack with invalid data
  564. except (TypeError, ValueError, StopIteration):
  565. pass
  566. data = data[1:]
  567. else:
  568. return next(self._unpacker)
  569. class ArchiveChecker:
  570. def __init__(self):
  571. self.error_found = False
  572. self.possibly_superseded = set()
  573. def check(self, repository, repair=False, archive=None, last=None):
  574. self.report_progress('Starting archive consistency check...')
  575. self.check_all = archive is None and last is None
  576. self.repair = repair
  577. self.repository = repository
  578. self.init_chunks()
  579. self.key = self.identify_key(repository)
  580. if Manifest.MANIFEST_ID not in self.chunks:
  581. self.manifest = self.rebuild_manifest()
  582. else:
  583. self.manifest, _ = Manifest.load(repository, key=self.key)
  584. self.rebuild_refcounts(archive=archive, last=last)
  585. self.orphan_chunks_check()
  586. self.finish()
  587. if not self.error_found:
  588. logger.info('Archive consistency check complete, no problems found.')
  589. return self.repair or not self.error_found
  590. def init_chunks(self):
  591. """Fetch a list of all object keys from repository
  592. """
  593. # Explicitly set the initial hash table capacity to avoid performance issues
  594. # due to hash table "resonance"
  595. capacity = int(len(self.repository) * 1.2)
  596. self.chunks = ChunkIndex(capacity)
  597. marker = None
  598. while True:
  599. result = self.repository.list(limit=10000, marker=marker)
  600. if not result:
  601. break
  602. marker = result[-1]
  603. for id_ in result:
  604. self.chunks[id_] = (0, 0, 0)
  605. def report_progress(self, msg, error=False):
  606. if error:
  607. self.error_found = True
  608. logger.log(logging.ERROR if error else logging.WARNING, msg)
  609. def identify_key(self, repository):
  610. cdata = repository.get(next(self.chunks.iteritems())[0])
  611. return key_factory(repository, cdata)
  612. def rebuild_manifest(self):
  613. """Rebuild the manifest object if it is missing
  614. Iterates through all objects in the repository looking for archive metadata blocks.
  615. """
  616. self.report_progress('Rebuilding missing manifest, this might take some time...', error=True)
  617. manifest = Manifest(self.key, self.repository)
  618. for chunk_id, _ in self.chunks.iteritems():
  619. cdata = self.repository.get(chunk_id)
  620. data = self.key.decrypt(chunk_id, cdata)
  621. # Some basic sanity checks of the payload before feeding it into msgpack
  622. if len(data) < 2 or ((data[0] & 0xf0) != 0x80) or ((data[1] & 0xe0) != 0xa0):
  623. continue
  624. if b'cmdline' not in data or b'\xa7version\x01' not in data:
  625. continue
  626. try:
  627. archive = msgpack.unpackb(data)
  628. # Ignore exceptions that might be raised when feeding
  629. # msgpack with invalid data
  630. except (TypeError, ValueError, StopIteration):
  631. continue
  632. if isinstance(archive, dict) and b'items' in archive and b'cmdline' in archive:
  633. self.report_progress('Found archive ' + archive[b'name'].decode('utf-8'), error=True)
  634. manifest.archives[archive[b'name'].decode('utf-8')] = {b'id': chunk_id, b'time': archive[b'time']}
  635. self.report_progress('Manifest rebuild complete', error=True)
  636. return manifest
  637. def rebuild_refcounts(self, archive=None, last=None):
  638. """Rebuild object reference counts by walking the metadata
  639. Missing and/or incorrect data is repaired when detected
  640. """
  641. # Exclude the manifest from chunks
  642. del self.chunks[Manifest.MANIFEST_ID]
  643. def mark_as_possibly_superseded(id_):
  644. if self.chunks.get(id_, (0,))[0] == 0:
  645. self.possibly_superseded.add(id_)
  646. def add_callback(chunk):
  647. id_ = self.key.id_hash(chunk)
  648. cdata = self.key.encrypt(chunk)
  649. add_reference(id_, len(chunk), len(cdata), cdata)
  650. return id_
  651. def add_reference(id_, size, csize, cdata=None):
  652. try:
  653. count, _, _ = self.chunks[id_]
  654. self.chunks[id_] = count + 1, size, csize
  655. except KeyError:
  656. assert cdata is not None
  657. self.chunks[id_] = 1, size, csize
  658. if self.repair:
  659. self.repository.put(id_, cdata)
  660. def verify_file_chunks(item):
  661. """Verifies that all file chunks are present
  662. Missing file chunks will be replaced with new chunks of the same
  663. length containing all zeros.
  664. """
  665. offset = 0
  666. chunk_list = []
  667. for chunk_id, size, csize in item[b'chunks']:
  668. if chunk_id not in self.chunks:
  669. # If a file chunk is missing, create an all empty replacement chunk
  670. self.report_progress('{}: Missing file chunk detected (Byte {}-{})'.format(item[b'path'].decode('utf-8', 'surrogateescape'), offset, offset + size), error=True)
  671. data = bytes(size)
  672. chunk_id = self.key.id_hash(data)
  673. cdata = self.key.encrypt(data)
  674. csize = len(cdata)
  675. add_reference(chunk_id, size, csize, cdata)
  676. else:
  677. add_reference(chunk_id, size, csize)
  678. chunk_list.append((chunk_id, size, csize))
  679. offset += size
  680. item[b'chunks'] = chunk_list
  681. def robust_iterator(archive):
  682. """Iterates through all archive items
  683. Missing item chunks will be skipped and the msgpack stream will be restarted
  684. """
  685. unpacker = RobustUnpacker(lambda item: isinstance(item, dict) and b'path' in item)
  686. _state = 0
  687. def missing_chunk_detector(chunk_id):
  688. nonlocal _state
  689. if _state % 2 != int(chunk_id not in self.chunks):
  690. _state += 1
  691. return _state
  692. for state, items in groupby(archive[b'items'], missing_chunk_detector):
  693. items = list(items)
  694. if state % 2:
  695. self.report_progress('Archive metadata damage detected', error=True)
  696. continue
  697. if state > 0:
  698. unpacker.resync()
  699. for chunk_id, cdata in zip(items, repository.get_many(items)):
  700. unpacker.feed(self.key.decrypt(chunk_id, cdata))
  701. for item in unpacker:
  702. if not isinstance(item, dict):
  703. self.report_progress('Did not get expected metadata dict - archive corrupted!',
  704. error=True)
  705. continue
  706. yield item
  707. repository = cache_if_remote(self.repository)
  708. if archive is None:
  709. # we need last N or all archives
  710. archive_items = sorted(self.manifest.archives.items(), reverse=True,
  711. key=lambda name_info: name_info[1][b'time'])
  712. num_archives = len(self.manifest.archives)
  713. end = None if last is None else min(num_archives, last)
  714. else:
  715. # we only want one specific archive
  716. archive_items = [item for item in self.manifest.archives.items() if item[0] == archive]
  717. num_archives = 1
  718. end = 1
  719. for i, (name, info) in enumerate(archive_items[:end]):
  720. logger.info('Analyzing archive {} ({}/{})'.format(name, num_archives - i, num_archives))
  721. archive_id = info[b'id']
  722. if archive_id not in self.chunks:
  723. self.report_progress('Archive metadata block is missing', error=True)
  724. del self.manifest.archives[name]
  725. continue
  726. mark_as_possibly_superseded(archive_id)
  727. cdata = self.repository.get(archive_id)
  728. data = self.key.decrypt(archive_id, cdata)
  729. archive = StableDict(msgpack.unpackb(data))
  730. if archive[b'version'] != 1:
  731. raise Exception('Unknown archive metadata version')
  732. decode_dict(archive, (b'name', b'hostname', b'username', b'time'))
  733. archive[b'cmdline'] = [arg.decode('utf-8', 'surrogateescape') for arg in archive[b'cmdline']]
  734. items_buffer = ChunkBuffer(self.key)
  735. items_buffer.write_chunk = add_callback
  736. for item in robust_iterator(archive):
  737. if b'chunks' in item:
  738. verify_file_chunks(item)
  739. items_buffer.add(item)
  740. items_buffer.flush(flush=True)
  741. for previous_item_id in archive[b'items']:
  742. mark_as_possibly_superseded(previous_item_id)
  743. archive[b'items'] = items_buffer.chunks
  744. data = msgpack.packb(archive, unicode_errors='surrogateescape')
  745. new_archive_id = self.key.id_hash(data)
  746. cdata = self.key.encrypt(data)
  747. add_reference(new_archive_id, len(data), len(cdata), cdata)
  748. info[b'id'] = new_archive_id
  749. def orphan_chunks_check(self):
  750. if self.check_all:
  751. unused = set()
  752. for id_, (count, size, csize) in self.chunks.iteritems():
  753. if count == 0:
  754. unused.add(id_)
  755. orphaned = unused - self.possibly_superseded
  756. if orphaned:
  757. self.report_progress('{} orphaned objects found'.format(len(orphaned)), error=True)
  758. if self.repair:
  759. for id_ in unused:
  760. self.repository.delete(id_)
  761. else:
  762. self.report_progress('Orphaned objects check skipped (needs all archives checked)')
  763. def finish(self):
  764. if self.repair:
  765. self.manifest.write()
  766. self.repository.commit()