archive.py 33 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861
  1. from binascii import hexlify
  2. from datetime import datetime
  3. from getpass import getuser
  4. from itertools import groupby
  5. import errno
  6. import logging
  7. from .logger import create_logger
  8. logger = create_logger()
  9. from .key import key_factory
  10. from .remote import cache_if_remote
  11. import os
  12. import socket
  13. import stat
  14. import sys
  15. import time
  16. from io import BytesIO
  17. from . import xattr
  18. from .helpers import parse_timestamp, Error, uid2user, user2uid, gid2group, group2gid, format_timedelta, \
  19. Manifest, Statistics, decode_dict, st_mtime_ns, make_path_safe, StableDict, int_to_bigint, bigint_to_int, have_cython
  20. if have_cython():
  21. from .platform import acl_get, acl_set
  22. from .chunker import Chunker
  23. from .hashindex import ChunkIndex
  24. import msgpack
  25. else:
  26. import mock
  27. msgpack = mock.Mock()
  28. ITEMS_BUFFER = 1024 * 1024
  29. CHUNK_MIN_EXP = 10 # 2**10 == 1kiB
  30. CHUNK_MAX_EXP = 23 # 2**23 == 8MiB
  31. HASH_WINDOW_SIZE = 0xfff # 4095B
  32. HASH_MASK_BITS = 16 # results in ~64kiB chunks statistically
  33. # defaults, use --chunker-params to override
  34. CHUNKER_PARAMS = (CHUNK_MIN_EXP, CHUNK_MAX_EXP, HASH_MASK_BITS, HASH_WINDOW_SIZE)
  35. utime_supports_fd = os.utime in getattr(os, 'supports_fd', {})
  36. utime_supports_follow_symlinks = os.utime in getattr(os, 'supports_follow_symlinks', {})
  37. has_mtime_ns = sys.version >= '3.3'
  38. has_lchmod = hasattr(os, 'lchmod')
  39. has_lchflags = hasattr(os, 'lchflags')
  40. # Python <= 3.2 raises OSError instead of PermissionError (See #164)
  41. try:
  42. PermissionError = PermissionError
  43. except NameError:
  44. PermissionError = OSError
  45. class DownloadPipeline:
  46. def __init__(self, repository, key):
  47. self.repository = repository
  48. self.key = key
  49. def unpack_many(self, ids, filter=None, preload=False):
  50. unpacker = msgpack.Unpacker(use_list=False)
  51. for data in self.fetch_many(ids):
  52. unpacker.feed(data)
  53. items = [decode_dict(item, (b'path', b'source', b'user', b'group')) for item in unpacker]
  54. if filter:
  55. items = [item for item in items if filter(item)]
  56. if preload:
  57. for item in items:
  58. if b'chunks' in item:
  59. self.repository.preload([c[0] for c in item[b'chunks']])
  60. for item in items:
  61. yield item
  62. def fetch_many(self, ids, is_preloaded=False):
  63. for id_, data in zip(ids, self.repository.get_many(ids, is_preloaded=is_preloaded)):
  64. yield self.key.decrypt(id_, data)
  65. class ChunkBuffer:
  66. BUFFER_SIZE = 1 * 1024 * 1024
  67. def __init__(self, key, chunker_params=CHUNKER_PARAMS):
  68. self.buffer = BytesIO()
  69. self.packer = msgpack.Packer(unicode_errors='surrogateescape')
  70. self.chunks = []
  71. self.key = key
  72. self.chunker = Chunker(self.key.chunk_seed, *chunker_params)
  73. def add(self, item):
  74. self.buffer.write(self.packer.pack(StableDict(item)))
  75. if self.is_full():
  76. self.flush()
  77. def write_chunk(self, chunk):
  78. raise NotImplementedError
  79. def flush(self, flush=False):
  80. if self.buffer.tell() == 0:
  81. return
  82. self.buffer.seek(0)
  83. chunks = list(bytes(s) for s in self.chunker.chunkify(self.buffer))
  84. self.buffer.seek(0)
  85. self.buffer.truncate(0)
  86. # Leave the last partial chunk in the buffer unless flush is True
  87. end = None if flush or len(chunks) == 1 else -1
  88. for chunk in chunks[:end]:
  89. self.chunks.append(self.write_chunk(chunk))
  90. if end == -1:
  91. self.buffer.write(chunks[-1])
  92. def is_full(self):
  93. return self.buffer.tell() > self.BUFFER_SIZE
  94. class CacheChunkBuffer(ChunkBuffer):
  95. def __init__(self, cache, key, stats, chunker_params=CHUNKER_PARAMS):
  96. super().__init__(key, chunker_params)
  97. self.cache = cache
  98. self.stats = stats
  99. def write_chunk(self, chunk):
  100. id_, _, _ = self.cache.add_chunk(self.key.id_hash(chunk), chunk, self.stats)
  101. return id_
  102. class Archive:
  103. class DoesNotExist(Error):
  104. """Archive {} does not exist"""
  105. class AlreadyExists(Error):
  106. """Archive {} already exists"""
  107. class IncompatibleFilesystemEncodingError(Error):
  108. """Failed to encode filename "{}" into file system encoding "{}". Consider configuring the LANG environment variable."""
  109. def __init__(self, repository, key, manifest, name, cache=None, create=False,
  110. checkpoint_interval=300, numeric_owner=False, progress=False,
  111. chunker_params=CHUNKER_PARAMS,
  112. start=datetime.now(), end=datetime.now()):
  113. self.cwd = os.getcwd()
  114. self.key = key
  115. self.repository = repository
  116. self.cache = cache
  117. self.manifest = manifest
  118. self.hard_links = {}
  119. self.stats = Statistics()
  120. self.show_progress = progress
  121. self.last_progress = time.time()
  122. self.name = name
  123. self.checkpoint_interval = checkpoint_interval
  124. self.numeric_owner = numeric_owner
  125. self.start = start
  126. self.end = end
  127. self.pipeline = DownloadPipeline(self.repository, self.key)
  128. if create:
  129. self.items_buffer = CacheChunkBuffer(self.cache, self.key, self.stats, chunker_params)
  130. self.chunker = Chunker(self.key.chunk_seed, *chunker_params)
  131. if name in manifest.archives:
  132. raise self.AlreadyExists(name)
  133. self.last_checkpoint = time.time()
  134. i = 0
  135. while True:
  136. self.checkpoint_name = '%s.checkpoint%s' % (name, i and ('.%d' % i) or '')
  137. if self.checkpoint_name not in manifest.archives:
  138. break
  139. i += 1
  140. else:
  141. if name not in self.manifest.archives:
  142. raise self.DoesNotExist(name)
  143. info = self.manifest.archives[name]
  144. self.load(info[b'id'])
  145. self.zeros = b'\0' * (1 << chunker_params[1])
  146. def _load_meta(self, id):
  147. data = self.key.decrypt(id, self.repository.get(id))
  148. metadata = msgpack.unpackb(data)
  149. if metadata[b'version'] != 1:
  150. raise Exception('Unknown archive metadata version')
  151. return metadata
  152. def load(self, id):
  153. self.id = id
  154. self.metadata = self._load_meta(self.id)
  155. decode_dict(self.metadata, (b'name', b'hostname', b'username', b'time'))
  156. self.metadata[b'cmdline'] = [arg.decode('utf-8', 'surrogateescape') for arg in self.metadata[b'cmdline']]
  157. self.name = self.metadata[b'name']
  158. @property
  159. def ts(self):
  160. """Timestamp of archive creation in UTC"""
  161. return parse_timestamp(self.metadata[b'time'])
  162. @property
  163. def fpr(self):
  164. return hexlify(self.id).decode('ascii')
  165. @property
  166. def duration(self):
  167. return format_timedelta(self.end-self.start)
  168. def __str__(self):
  169. buf = '''Archive name: {0.name}
  170. Archive fingerprint: {0.fpr}
  171. Start time: {0.start:%c}
  172. End time: {0.end:%c}
  173. Duration: {0.duration}
  174. Number of files: {0.stats.nfiles}
  175. {0.cache}'''.format(self)
  176. return buf
  177. def __repr__(self):
  178. return 'Archive(%r)' % self.name
  179. def iter_items(self, filter=None, preload=False):
  180. for item in self.pipeline.unpack_many(self.metadata[b'items'], filter=filter, preload=preload):
  181. yield item
  182. def add_item(self, item):
  183. if self.show_progress and time.time() - self.last_progress > 0.2:
  184. self.stats.show_progress(item=item)
  185. self.last_progress = time.time()
  186. self.items_buffer.add(item)
  187. if time.time() - self.last_checkpoint > self.checkpoint_interval:
  188. self.write_checkpoint()
  189. self.last_checkpoint = time.time()
  190. def write_checkpoint(self):
  191. self.save(self.checkpoint_name)
  192. del self.manifest.archives[self.checkpoint_name]
  193. self.cache.chunk_decref(self.id, self.stats)
  194. def save(self, name=None, timestamp=None):
  195. name = name or self.name
  196. if name in self.manifest.archives:
  197. raise self.AlreadyExists(name)
  198. self.items_buffer.flush(flush=True)
  199. if timestamp is None:
  200. timestamp = datetime.utcnow()
  201. metadata = StableDict({
  202. 'version': 1,
  203. 'name': name,
  204. 'items': self.items_buffer.chunks,
  205. 'cmdline': sys.argv,
  206. 'hostname': socket.gethostname(),
  207. 'username': getuser(),
  208. 'time': timestamp.isoformat(),
  209. })
  210. data = msgpack.packb(metadata, unicode_errors='surrogateescape')
  211. self.id = self.key.id_hash(data)
  212. self.cache.add_chunk(self.id, data, self.stats)
  213. self.manifest.archives[name] = {'id': self.id, 'time': metadata['time']}
  214. self.manifest.write()
  215. self.repository.commit()
  216. self.cache.commit()
  217. def calc_stats(self, cache):
  218. def add(id):
  219. count, size, csize = cache.chunks[id]
  220. stats.update(size, csize, count == 1)
  221. cache.chunks[id] = count - 1, size, csize
  222. def add_file_chunks(chunks):
  223. for id, _, _ in chunks:
  224. add(id)
  225. # This function is a bit evil since it abuses the cache to calculate
  226. # the stats. The cache transaction must be rolled back afterwards
  227. unpacker = msgpack.Unpacker(use_list=False)
  228. cache.begin_txn()
  229. stats = Statistics()
  230. add(self.id)
  231. for id, chunk in zip(self.metadata[b'items'], self.repository.get_many(self.metadata[b'items'])):
  232. add(id)
  233. unpacker.feed(self.key.decrypt(id, chunk))
  234. for item in unpacker:
  235. if b'chunks' in item:
  236. stats.nfiles += 1
  237. add_file_chunks(item[b'chunks'])
  238. cache.rollback()
  239. return stats
  240. def extract_item(self, item, restore_attrs=True, dry_run=False, stdout=False, sparse=False):
  241. if dry_run or stdout:
  242. if b'chunks' in item:
  243. for data in self.pipeline.fetch_many([c[0] for c in item[b'chunks']], is_preloaded=True):
  244. if stdout:
  245. sys.stdout.buffer.write(data)
  246. if stdout:
  247. sys.stdout.buffer.flush()
  248. return
  249. dest = self.cwd
  250. if item[b'path'].startswith('/') or item[b'path'].startswith('..'):
  251. raise Exception('Path should be relative and local')
  252. path = os.path.join(dest, item[b'path'])
  253. # Attempt to remove existing files, ignore errors on failure
  254. try:
  255. st = os.lstat(path)
  256. if stat.S_ISDIR(st.st_mode):
  257. os.rmdir(path)
  258. else:
  259. os.unlink(path)
  260. except UnicodeEncodeError:
  261. raise self.IncompatibleFilesystemEncodingError(path, sys.getfilesystemencoding())
  262. except OSError:
  263. pass
  264. mode = item[b'mode']
  265. if stat.S_ISREG(mode):
  266. if not os.path.exists(os.path.dirname(path)):
  267. os.makedirs(os.path.dirname(path))
  268. # Hard link?
  269. if b'source' in item:
  270. source = os.path.join(dest, item[b'source'])
  271. if os.path.exists(path):
  272. os.unlink(path)
  273. os.link(source, path)
  274. else:
  275. with open(path, 'wb') as fd:
  276. ids = [c[0] for c in item[b'chunks']]
  277. for data in self.pipeline.fetch_many(ids, is_preloaded=True):
  278. if sparse and self.zeros.startswith(data):
  279. # all-zero chunk: create a hole in a sparse file
  280. fd.seek(len(data), 1)
  281. else:
  282. fd.write(data)
  283. pos = fd.tell()
  284. fd.truncate(pos)
  285. fd.flush()
  286. self.restore_attrs(path, item, fd=fd.fileno())
  287. elif stat.S_ISDIR(mode):
  288. if not os.path.exists(path):
  289. os.makedirs(path)
  290. if restore_attrs:
  291. self.restore_attrs(path, item)
  292. elif stat.S_ISLNK(mode):
  293. if not os.path.exists(os.path.dirname(path)):
  294. os.makedirs(os.path.dirname(path))
  295. source = item[b'source']
  296. if os.path.exists(path):
  297. os.unlink(path)
  298. os.symlink(source, path)
  299. self.restore_attrs(path, item, symlink=True)
  300. elif stat.S_ISFIFO(mode):
  301. if not os.path.exists(os.path.dirname(path)):
  302. os.makedirs(os.path.dirname(path))
  303. os.mkfifo(path)
  304. self.restore_attrs(path, item)
  305. elif stat.S_ISCHR(mode) or stat.S_ISBLK(mode):
  306. os.mknod(path, item[b'mode'], item[b'rdev'])
  307. self.restore_attrs(path, item)
  308. else:
  309. raise Exception('Unknown archive item type %r' % item[b'mode'])
  310. def restore_attrs(self, path, item, symlink=False, fd=None):
  311. xattrs = item.get(b'xattrs', {})
  312. for k, v in xattrs.items():
  313. try:
  314. xattr.setxattr(fd or path, k, v, follow_symlinks=False)
  315. except OSError as e:
  316. if e.errno not in (errno.ENOTSUP, errno.EACCES, ):
  317. # only raise if the errno is not on our ignore list:
  318. # ENOTSUP == xattrs not supported here
  319. # EACCES == permission denied to set this specific xattr
  320. # (this may happen related to security.* keys)
  321. raise
  322. uid = gid = None
  323. if not self.numeric_owner:
  324. uid = user2uid(item[b'user'])
  325. gid = group2gid(item[b'group'])
  326. uid = item[b'uid'] if uid is None else uid
  327. gid = item[b'gid'] if gid is None else gid
  328. # This code is a bit of a mess due to os specific differences
  329. try:
  330. if fd:
  331. os.fchown(fd, uid, gid)
  332. else:
  333. os.lchown(path, uid, gid)
  334. except OSError:
  335. pass
  336. if fd:
  337. os.fchmod(fd, item[b'mode'])
  338. elif not symlink:
  339. os.chmod(path, item[b'mode'])
  340. elif has_lchmod: # Not available on Linux
  341. os.lchmod(path, item[b'mode'])
  342. mtime = bigint_to_int(item[b'mtime'])
  343. if fd and utime_supports_fd: # Python >= 3.3
  344. os.utime(fd, None, ns=(mtime, mtime))
  345. elif utime_supports_follow_symlinks: # Python >= 3.3
  346. os.utime(path, None, ns=(mtime, mtime), follow_symlinks=False)
  347. elif not symlink:
  348. os.utime(path, (mtime / 1e9, mtime / 1e9))
  349. acl_set(path, item, self.numeric_owner)
  350. # Only available on OS X and FreeBSD
  351. if has_lchflags and b'bsdflags' in item:
  352. try:
  353. os.lchflags(path, item[b'bsdflags'])
  354. except OSError:
  355. pass
  356. def rename(self, name):
  357. if name in self.manifest.archives:
  358. raise self.AlreadyExists(name)
  359. metadata = StableDict(self._load_meta(self.id))
  360. metadata[b'name'] = name
  361. data = msgpack.packb(metadata, unicode_errors='surrogateescape')
  362. new_id = self.key.id_hash(data)
  363. self.cache.add_chunk(new_id, data, self.stats)
  364. self.manifest.archives[name] = {'id': new_id, 'time': metadata[b'time']}
  365. self.cache.chunk_decref(self.id, self.stats)
  366. del self.manifest.archives[self.name]
  367. def delete(self, stats):
  368. unpacker = msgpack.Unpacker(use_list=False)
  369. for items_id, data in zip(self.metadata[b'items'], self.repository.get_many(self.metadata[b'items'])):
  370. unpacker.feed(self.key.decrypt(items_id, data))
  371. self.cache.chunk_decref(items_id, stats)
  372. for item in unpacker:
  373. if b'chunks' in item:
  374. for chunk_id, size, csize in item[b'chunks']:
  375. self.cache.chunk_decref(chunk_id, stats)
  376. self.cache.chunk_decref(self.id, stats)
  377. del self.manifest.archives[self.name]
  378. def stat_attrs(self, st, path):
  379. item = {
  380. b'mode': st.st_mode,
  381. b'uid': st.st_uid, b'user': uid2user(st.st_uid),
  382. b'gid': st.st_gid, b'group': gid2group(st.st_gid),
  383. b'mtime': int_to_bigint(st_mtime_ns(st))
  384. }
  385. if self.numeric_owner:
  386. item[b'user'] = item[b'group'] = None
  387. xattrs = xattr.get_all(path, follow_symlinks=False)
  388. if xattrs:
  389. item[b'xattrs'] = StableDict(xattrs)
  390. if has_lchflags and st.st_flags:
  391. item[b'bsdflags'] = st.st_flags
  392. acl_get(path, item, st, self.numeric_owner)
  393. return item
  394. def process_dir(self, path, st):
  395. item = {b'path': make_path_safe(path)}
  396. item.update(self.stat_attrs(st, path))
  397. self.add_item(item)
  398. return 'd' # directory
  399. def process_fifo(self, path, st):
  400. item = {b'path': make_path_safe(path)}
  401. item.update(self.stat_attrs(st, path))
  402. self.add_item(item)
  403. return 'f' # fifo
  404. def process_dev(self, path, st):
  405. item = {b'path': make_path_safe(path), b'rdev': st.st_rdev}
  406. item.update(self.stat_attrs(st, path))
  407. self.add_item(item)
  408. if stat.S_ISCHR(st.st_mode):
  409. return 'c' # char device
  410. elif stat.S_ISBLK(st.st_mode):
  411. return 'b' # block device
  412. def process_symlink(self, path, st):
  413. source = os.readlink(path)
  414. item = {b'path': make_path_safe(path), b'source': source}
  415. item.update(self.stat_attrs(st, path))
  416. self.add_item(item)
  417. return 's' # symlink
  418. def process_stdin(self, path, cache):
  419. uid, gid = 0, 0
  420. fd = sys.stdin.buffer # binary
  421. chunks = []
  422. for chunk in self.chunker.chunkify(fd):
  423. chunks.append(cache.add_chunk(self.key.id_hash(chunk), chunk, self.stats))
  424. self.stats.nfiles += 1
  425. item = {
  426. b'path': path,
  427. b'chunks': chunks,
  428. b'mode': 0o100660, # regular file, ug=rw
  429. b'uid': uid, b'user': uid2user(uid),
  430. b'gid': gid, b'group': gid2group(gid),
  431. b'mtime': int_to_bigint(int(time.time()) * 1000000000)
  432. }
  433. self.add_item(item)
  434. return 'i' # stdin
  435. def process_file(self, path, st, cache):
  436. status = None
  437. safe_path = make_path_safe(path)
  438. # Is it a hard link?
  439. if st.st_nlink > 1:
  440. source = self.hard_links.get((st.st_ino, st.st_dev))
  441. if (st.st_ino, st.st_dev) in self.hard_links:
  442. item = self.stat_attrs(st, path)
  443. item.update({b'path': safe_path, b'source': source})
  444. self.add_item(item)
  445. status = 'h' # regular file, hardlink (to already seen inodes)
  446. return status
  447. else:
  448. self.hard_links[st.st_ino, st.st_dev] = safe_path
  449. path_hash = self.key.id_hash(os.path.join(self.cwd, path).encode('utf-8', 'surrogateescape'))
  450. first_run = not cache.files
  451. ids = cache.file_known_and_unchanged(path_hash, st)
  452. if first_run:
  453. logger.info('processing files')
  454. chunks = None
  455. if ids is not None:
  456. # Make sure all ids are available
  457. for id_ in ids:
  458. if not cache.seen_chunk(id_):
  459. break
  460. else:
  461. chunks = [cache.chunk_incref(id_, self.stats) for id_ in ids]
  462. status = 'U' # regular file, unchanged
  463. else:
  464. status = 'A' # regular file, added
  465. # Only chunkify the file if needed
  466. if chunks is None:
  467. fh = Archive._open_rb(path, st)
  468. with os.fdopen(fh, 'rb') as fd:
  469. chunks = []
  470. for chunk in self.chunker.chunkify(fd, fh):
  471. chunks.append(cache.add_chunk(self.key.id_hash(chunk), chunk, self.stats))
  472. cache.memorize_file(path_hash, st, [c[0] for c in chunks])
  473. status = status or 'M' # regular file, modified (if not 'A' already)
  474. item = {b'path': safe_path, b'chunks': chunks}
  475. item.update(self.stat_attrs(st, path))
  476. self.stats.nfiles += 1
  477. self.add_item(item)
  478. return status
  479. @staticmethod
  480. def list_archives(repository, key, manifest, cache=None):
  481. # expensive! see also Manifest.list_archive_infos.
  482. for name, info in manifest.archives.items():
  483. yield Archive(repository, key, manifest, name, cache=cache)
  484. @staticmethod
  485. def _open_rb(path, st):
  486. flags_normal = os.O_RDONLY | getattr(os, 'O_BINARY', 0)
  487. flags_noatime = flags_normal | getattr(os, 'NO_ATIME', 0)
  488. euid = None
  489. def open_simple(p, s):
  490. return os.open(p, flags_normal)
  491. def open_noatime(p, s):
  492. return os.open(p, flags_noatime)
  493. def open_noatime_if_owner(p, s):
  494. if euid == 0 or s.st_uid == euid:
  495. # we are root or owner of file
  496. return open_noatime(p, s)
  497. else:
  498. return open_simple(p, s)
  499. def open_noatime_with_fallback(p, s):
  500. try:
  501. fd = os.open(p, flags_noatime)
  502. except PermissionError:
  503. # Was this EPERM due to the O_NOATIME flag?
  504. fd = os.open(p, flags_normal)
  505. # Yes, it was -- otherwise the above line would have thrown
  506. # another exception.
  507. nonlocal euid
  508. euid = os.geteuid()
  509. # So in future, let's check whether the file is owned by us
  510. # before attempting to use O_NOATIME.
  511. Archive._open_rb = open_noatime_if_owner
  512. return fd
  513. if flags_noatime != flags_normal:
  514. # Always use O_NOATIME version.
  515. Archive._open_rb = open_noatime_with_fallback
  516. else:
  517. # Always use non-O_NOATIME version.
  518. Archive._open_rb = open_simple
  519. return Archive._open_rb(path, st)
  520. class RobustUnpacker:
  521. """A restartable/robust version of the streaming msgpack unpacker
  522. """
  523. item_keys = [msgpack.packb(name) for name in ('path', 'mode', 'source', 'chunks', 'rdev', 'xattrs', 'user', 'group', 'uid', 'gid', 'mtime')]
  524. def __init__(self, validator):
  525. super().__init__()
  526. self.validator = validator
  527. self._buffered_data = []
  528. self._resync = False
  529. self._unpacker = msgpack.Unpacker(object_hook=StableDict)
  530. def resync(self):
  531. self._buffered_data = []
  532. self._resync = True
  533. def feed(self, data):
  534. if self._resync:
  535. self._buffered_data.append(data)
  536. else:
  537. self._unpacker.feed(data)
  538. def __iter__(self):
  539. return self
  540. def __next__(self):
  541. if self._resync:
  542. data = b''.join(self._buffered_data)
  543. while self._resync:
  544. if not data:
  545. raise StopIteration
  546. # Abort early if the data does not look like a serialized dict
  547. if len(data) < 2 or ((data[0] & 0xf0) != 0x80) or ((data[1] & 0xe0) != 0xa0):
  548. data = data[1:]
  549. continue
  550. # Make sure it looks like an item dict
  551. for pattern in self.item_keys:
  552. if data[1:].startswith(pattern):
  553. break
  554. else:
  555. data = data[1:]
  556. continue
  557. self._unpacker = msgpack.Unpacker(object_hook=StableDict)
  558. self._unpacker.feed(data)
  559. try:
  560. item = next(self._unpacker)
  561. if self.validator(item):
  562. self._resync = False
  563. return item
  564. # Ignore exceptions that might be raised when feeding
  565. # msgpack with invalid data
  566. except (TypeError, ValueError, StopIteration):
  567. pass
  568. data = data[1:]
  569. else:
  570. return next(self._unpacker)
  571. class ArchiveChecker:
  572. def __init__(self):
  573. self.error_found = False
  574. self.possibly_superseded = set()
  575. def check(self, repository, repair=False, archive=None, last=None):
  576. self.report_progress('Starting archive consistency check...')
  577. self.check_all = archive is None and last is None
  578. self.repair = repair
  579. self.repository = repository
  580. self.init_chunks()
  581. self.key = self.identify_key(repository)
  582. if Manifest.MANIFEST_ID not in self.chunks:
  583. self.manifest = self.rebuild_manifest()
  584. else:
  585. self.manifest, _ = Manifest.load(repository, key=self.key)
  586. self.rebuild_refcounts(archive=archive, last=last)
  587. self.orphan_chunks_check()
  588. self.finish()
  589. if not self.error_found:
  590. logger.info('Archive consistency check complete, no problems found.')
  591. return self.repair or not self.error_found
  592. def init_chunks(self):
  593. """Fetch a list of all object keys from repository
  594. """
  595. # Explicitly set the initial hash table capacity to avoid performance issues
  596. # due to hash table "resonance"
  597. capacity = int(len(self.repository) * 1.2)
  598. self.chunks = ChunkIndex(capacity)
  599. marker = None
  600. while True:
  601. result = self.repository.list(limit=10000, marker=marker)
  602. if not result:
  603. break
  604. marker = result[-1]
  605. for id_ in result:
  606. self.chunks[id_] = (0, 0, 0)
  607. def report_progress(self, msg, error=False):
  608. if error:
  609. self.error_found = True
  610. logger.log(logging.ERROR if error else logging.WARNING, msg)
  611. def identify_key(self, repository):
  612. cdata = repository.get(next(self.chunks.iteritems())[0])
  613. return key_factory(repository, cdata)
  614. def rebuild_manifest(self):
  615. """Rebuild the manifest object if it is missing
  616. Iterates through all objects in the repository looking for archive metadata blocks.
  617. """
  618. self.report_progress('Rebuilding missing manifest, this might take some time...', error=True)
  619. manifest = Manifest(self.key, self.repository)
  620. for chunk_id, _ in self.chunks.iteritems():
  621. cdata = self.repository.get(chunk_id)
  622. data = self.key.decrypt(chunk_id, cdata)
  623. # Some basic sanity checks of the payload before feeding it into msgpack
  624. if len(data) < 2 or ((data[0] & 0xf0) != 0x80) or ((data[1] & 0xe0) != 0xa0):
  625. continue
  626. if b'cmdline' not in data or b'\xa7version\x01' not in data:
  627. continue
  628. try:
  629. archive = msgpack.unpackb(data)
  630. # Ignore exceptions that might be raised when feeding
  631. # msgpack with invalid data
  632. except (TypeError, ValueError, StopIteration):
  633. continue
  634. if isinstance(archive, dict) and b'items' in archive and b'cmdline' in archive:
  635. self.report_progress('Found archive ' + archive[b'name'].decode('utf-8'), error=True)
  636. manifest.archives[archive[b'name'].decode('utf-8')] = {b'id': chunk_id, b'time': archive[b'time']}
  637. self.report_progress('Manifest rebuild complete', error=True)
  638. return manifest
  639. def rebuild_refcounts(self, archive=None, last=None):
  640. """Rebuild object reference counts by walking the metadata
  641. Missing and/or incorrect data is repaired when detected
  642. """
  643. # Exclude the manifest from chunks
  644. del self.chunks[Manifest.MANIFEST_ID]
  645. def mark_as_possibly_superseded(id_):
  646. if self.chunks.get(id_, (0,))[0] == 0:
  647. self.possibly_superseded.add(id_)
  648. def add_callback(chunk):
  649. id_ = self.key.id_hash(chunk)
  650. cdata = self.key.encrypt(chunk)
  651. add_reference(id_, len(chunk), len(cdata), cdata)
  652. return id_
  653. def add_reference(id_, size, csize, cdata=None):
  654. try:
  655. count, _, _ = self.chunks[id_]
  656. self.chunks[id_] = count + 1, size, csize
  657. except KeyError:
  658. assert cdata is not None
  659. self.chunks[id_] = 1, size, csize
  660. if self.repair:
  661. self.repository.put(id_, cdata)
  662. def verify_file_chunks(item):
  663. """Verifies that all file chunks are present
  664. Missing file chunks will be replaced with new chunks of the same
  665. length containing all zeros.
  666. """
  667. offset = 0
  668. chunk_list = []
  669. for chunk_id, size, csize in item[b'chunks']:
  670. if chunk_id not in self.chunks:
  671. # If a file chunk is missing, create an all empty replacement chunk
  672. self.report_progress('{}: Missing file chunk detected (Byte {}-{})'.format(item[b'path'].decode('utf-8', 'surrogateescape'), offset, offset + size), error=True)
  673. data = bytes(size)
  674. chunk_id = self.key.id_hash(data)
  675. cdata = self.key.encrypt(data)
  676. csize = len(cdata)
  677. add_reference(chunk_id, size, csize, cdata)
  678. else:
  679. add_reference(chunk_id, size, csize)
  680. chunk_list.append((chunk_id, size, csize))
  681. offset += size
  682. item[b'chunks'] = chunk_list
  683. def robust_iterator(archive):
  684. """Iterates through all archive items
  685. Missing item chunks will be skipped and the msgpack stream will be restarted
  686. """
  687. unpacker = RobustUnpacker(lambda item: isinstance(item, dict) and b'path' in item)
  688. _state = 0
  689. def missing_chunk_detector(chunk_id):
  690. nonlocal _state
  691. if _state % 2 != int(chunk_id not in self.chunks):
  692. _state += 1
  693. return _state
  694. for state, items in groupby(archive[b'items'], missing_chunk_detector):
  695. items = list(items)
  696. if state % 2:
  697. self.report_progress('Archive metadata damage detected', error=True)
  698. continue
  699. if state > 0:
  700. unpacker.resync()
  701. for chunk_id, cdata in zip(items, repository.get_many(items)):
  702. unpacker.feed(self.key.decrypt(chunk_id, cdata))
  703. for item in unpacker:
  704. if not isinstance(item, dict):
  705. self.report_progress('Did not get expected metadata dict - archive corrupted!',
  706. error=True)
  707. continue
  708. yield item
  709. repository = cache_if_remote(self.repository)
  710. if archive is None:
  711. # we need last N or all archives
  712. archive_items = sorted(self.manifest.archives.items(), reverse=True,
  713. key=lambda name_info: name_info[1][b'time'])
  714. num_archives = len(self.manifest.archives)
  715. end = None if last is None else min(num_archives, last)
  716. else:
  717. # we only want one specific archive
  718. archive_items = [item for item in self.manifest.archives.items() if item[0] == archive]
  719. num_archives = 1
  720. end = 1
  721. for i, (name, info) in enumerate(archive_items[:end]):
  722. logger.info('Analyzing archive {} ({}/{})'.format(name, num_archives - i, num_archives))
  723. archive_id = info[b'id']
  724. if archive_id not in self.chunks:
  725. self.report_progress('Archive metadata block is missing', error=True)
  726. del self.manifest.archives[name]
  727. continue
  728. mark_as_possibly_superseded(archive_id)
  729. cdata = self.repository.get(archive_id)
  730. data = self.key.decrypt(archive_id, cdata)
  731. archive = StableDict(msgpack.unpackb(data))
  732. if archive[b'version'] != 1:
  733. raise Exception('Unknown archive metadata version')
  734. decode_dict(archive, (b'name', b'hostname', b'username', b'time')) # fixme: argv
  735. items_buffer = ChunkBuffer(self.key)
  736. items_buffer.write_chunk = add_callback
  737. for item in robust_iterator(archive):
  738. if b'chunks' in item:
  739. verify_file_chunks(item)
  740. items_buffer.add(item)
  741. items_buffer.flush(flush=True)
  742. for previous_item_id in archive[b'items']:
  743. mark_as_possibly_superseded(previous_item_id)
  744. archive[b'items'] = items_buffer.chunks
  745. data = msgpack.packb(archive, unicode_errors='surrogateescape')
  746. new_archive_id = self.key.id_hash(data)
  747. cdata = self.key.encrypt(data)
  748. add_reference(new_archive_id, len(data), len(cdata), cdata)
  749. info[b'id'] = new_archive_id
  750. def orphan_chunks_check(self):
  751. if self.check_all:
  752. unused = set()
  753. for id_, (count, size, csize) in self.chunks.iteritems():
  754. if count == 0:
  755. unused.add(id_)
  756. orphaned = unused - self.possibly_superseded
  757. if orphaned:
  758. self.report_progress('{} orphaned objects found'.format(len(orphaned)), error=True)
  759. if self.repair:
  760. for id_ in unused:
  761. self.repository.delete(id_)
  762. else:
  763. self.report_progress('Orphaned objects check skipped (needs all archives checked)')
  764. def finish(self):
  765. if self.repair:
  766. self.manifest.write()
  767. self.repository.commit()