archive.py 35 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889
  1. from binascii import hexlify
  2. from datetime import datetime, timezone
  3. from getpass import getuser
  4. from itertools import groupby
  5. import errno
  6. from .logger import create_logger
  7. logger = create_logger()
  8. from .key import key_factory
  9. from .remote import cache_if_remote
  10. import os
  11. import socket
  12. import stat
  13. import sys
  14. import time
  15. from io import BytesIO
  16. from . import xattr
  17. from .helpers import Error, uid2user, user2uid, gid2group, group2gid, \
  18. parse_timestamp, to_localtime, format_time, format_timedelta, \
  19. Manifest, Statistics, decode_dict, make_path_safe, StableDict, int_to_bigint, bigint_to_int, \
  20. ProgressIndicatorPercent
  21. from .platform import acl_get, acl_set
  22. from .chunker import Chunker
  23. from .hashindex import ChunkIndex
  24. import msgpack
  25. ITEMS_BUFFER = 1024 * 1024
  26. CHUNK_MIN_EXP = 19 # 2**19 == 512kiB
  27. CHUNK_MAX_EXP = 23 # 2**23 == 8MiB
  28. HASH_WINDOW_SIZE = 0xfff # 4095B
  29. HASH_MASK_BITS = 21 # results in ~2MiB chunks statistically
  30. # defaults, use --chunker-params to override
  31. CHUNKER_PARAMS = (CHUNK_MIN_EXP, CHUNK_MAX_EXP, HASH_MASK_BITS, HASH_WINDOW_SIZE)
  32. # chunker params for the items metadata stream, finer granularity
  33. ITEMS_CHUNKER_PARAMS = (12, 16, 14, HASH_WINDOW_SIZE)
  34. has_lchmod = hasattr(os, 'lchmod')
  35. has_lchflags = hasattr(os, 'lchflags')
  36. flags_normal = os.O_RDONLY | getattr(os, 'O_BINARY', 0)
  37. flags_noatime = flags_normal | getattr(os, 'O_NOATIME', 0)
  38. class DownloadPipeline:
  39. def __init__(self, repository, key):
  40. self.repository = repository
  41. self.key = key
  42. def unpack_many(self, ids, filter=None, preload=False):
  43. unpacker = msgpack.Unpacker(use_list=False)
  44. for data in self.fetch_many(ids):
  45. unpacker.feed(data)
  46. items = [decode_dict(item, (b'path', b'source', b'user', b'group')) for item in unpacker]
  47. if filter:
  48. items = [item for item in items if filter(item)]
  49. if preload:
  50. for item in items:
  51. if b'chunks' in item:
  52. self.repository.preload([c[0] for c in item[b'chunks']])
  53. for item in items:
  54. yield item
  55. def fetch_many(self, ids, is_preloaded=False):
  56. for id_, data in zip(ids, self.repository.get_many(ids, is_preloaded=is_preloaded)):
  57. yield self.key.decrypt(id_, data)
  58. class ChunkBuffer:
  59. BUFFER_SIZE = 1 * 1024 * 1024
  60. def __init__(self, key, chunker_params=ITEMS_CHUNKER_PARAMS):
  61. self.buffer = BytesIO()
  62. self.packer = msgpack.Packer(unicode_errors='surrogateescape')
  63. self.chunks = []
  64. self.key = key
  65. self.chunker = Chunker(self.key.chunk_seed, *chunker_params)
  66. def add(self, item):
  67. self.buffer.write(self.packer.pack(StableDict(item)))
  68. if self.is_full():
  69. self.flush()
  70. def write_chunk(self, chunk):
  71. raise NotImplementedError
  72. def flush(self, flush=False):
  73. if self.buffer.tell() == 0:
  74. return
  75. self.buffer.seek(0)
  76. chunks = list(bytes(s) for s in self.chunker.chunkify(self.buffer))
  77. self.buffer.seek(0)
  78. self.buffer.truncate(0)
  79. # Leave the last partial chunk in the buffer unless flush is True
  80. end = None if flush or len(chunks) == 1 else -1
  81. for chunk in chunks[:end]:
  82. self.chunks.append(self.write_chunk(chunk))
  83. if end == -1:
  84. self.buffer.write(chunks[-1])
  85. def is_full(self):
  86. return self.buffer.tell() > self.BUFFER_SIZE
  87. class CacheChunkBuffer(ChunkBuffer):
  88. def __init__(self, cache, key, stats, chunker_params=ITEMS_CHUNKER_PARAMS):
  89. super().__init__(key, chunker_params)
  90. self.cache = cache
  91. self.stats = stats
  92. def write_chunk(self, chunk):
  93. id_, _, _ = self.cache.add_chunk(self.key.id_hash(chunk), chunk, self.stats)
  94. return id_
  95. class Archive:
  96. class DoesNotExist(Error):
  97. """Archive {} does not exist"""
  98. class AlreadyExists(Error):
  99. """Archive {} already exists"""
  100. class IncompatibleFilesystemEncodingError(Error):
  101. """Failed to encode filename "{}" into file system encoding "{}". Consider configuring the LANG environment variable."""
  102. def __init__(self, repository, key, manifest, name, cache=None, create=False,
  103. checkpoint_interval=300, numeric_owner=False, progress=False,
  104. chunker_params=CHUNKER_PARAMS, start=None, end=None):
  105. self.cwd = os.getcwd()
  106. self.key = key
  107. self.repository = repository
  108. self.cache = cache
  109. self.manifest = manifest
  110. self.hard_links = {}
  111. self.stats = Statistics()
  112. self.show_progress = progress
  113. self.name = name
  114. self.checkpoint_interval = checkpoint_interval
  115. self.numeric_owner = numeric_owner
  116. if start is None:
  117. start = datetime.utcnow()
  118. self.chunker_params = chunker_params
  119. self.start = start
  120. if end is None:
  121. end = datetime.utcnow()
  122. self.end = end
  123. self.pipeline = DownloadPipeline(self.repository, self.key)
  124. if create:
  125. self.items_buffer = CacheChunkBuffer(self.cache, self.key, self.stats)
  126. self.chunker = Chunker(self.key.chunk_seed, *chunker_params)
  127. if name in manifest.archives:
  128. raise self.AlreadyExists(name)
  129. self.last_checkpoint = time.time()
  130. i = 0
  131. while True:
  132. self.checkpoint_name = '%s.checkpoint%s' % (name, i and ('.%d' % i) or '')
  133. if self.checkpoint_name not in manifest.archives:
  134. break
  135. i += 1
  136. else:
  137. if name not in self.manifest.archives:
  138. raise self.DoesNotExist(name)
  139. info = self.manifest.archives[name]
  140. self.load(info[b'id'])
  141. self.zeros = b'\0' * (1 << chunker_params[1])
  142. def _load_meta(self, id):
  143. data = self.key.decrypt(id, self.repository.get(id))
  144. metadata = msgpack.unpackb(data)
  145. if metadata[b'version'] != 1:
  146. raise Exception('Unknown archive metadata version')
  147. return metadata
  148. def load(self, id):
  149. self.id = id
  150. self.metadata = self._load_meta(self.id)
  151. decode_dict(self.metadata, (b'name', b'hostname', b'username', b'time', b'time_end'))
  152. self.metadata[b'cmdline'] = [arg.decode('utf-8', 'surrogateescape') for arg in self.metadata[b'cmdline']]
  153. self.name = self.metadata[b'name']
  154. @property
  155. def ts(self):
  156. """Timestamp of archive creation (start) in UTC"""
  157. ts = self.metadata[b'time']
  158. return parse_timestamp(ts)
  159. @property
  160. def ts_end(self):
  161. """Timestamp of archive creation (end) in UTC"""
  162. # fall back to time if there is no time_end present in metadata
  163. ts = self.metadata.get(b'time_end') or self.metadata[b'time']
  164. return parse_timestamp(ts)
  165. @property
  166. def fpr(self):
  167. return hexlify(self.id).decode('ascii')
  168. @property
  169. def duration(self):
  170. return format_timedelta(self.end - self.start)
  171. def __str__(self):
  172. return '''\
  173. Archive name: {0.name}
  174. Archive fingerprint: {0.fpr}
  175. Time (start): {start}
  176. Time (end): {end}
  177. Duration: {0.duration}
  178. Number of files: {0.stats.nfiles}'''.format(
  179. self,
  180. start=format_time(to_localtime(self.start.replace(tzinfo=timezone.utc))),
  181. end=format_time(to_localtime(self.end.replace(tzinfo=timezone.utc))))
  182. def __repr__(self):
  183. return 'Archive(%r)' % self.name
  184. def iter_items(self, filter=None, preload=False):
  185. for item in self.pipeline.unpack_many(self.metadata[b'items'], filter=filter, preload=preload):
  186. yield item
  187. def add_item(self, item):
  188. unknown_keys = set(item) - ITEM_KEYS
  189. assert not unknown_keys, ('unknown item metadata keys detected, please update ITEM_KEYS: %s',
  190. ','.join(k.decode('ascii') for k in unknown_keys))
  191. if self.show_progress:
  192. self.stats.show_progress(item=item, dt=0.2)
  193. self.items_buffer.add(item)
  194. if time.time() - self.last_checkpoint > self.checkpoint_interval:
  195. self.write_checkpoint()
  196. self.last_checkpoint = time.time()
  197. def write_checkpoint(self):
  198. self.save(self.checkpoint_name)
  199. del self.manifest.archives[self.checkpoint_name]
  200. self.cache.chunk_decref(self.id, self.stats)
  201. def save(self, name=None, timestamp=None):
  202. name = name or self.name
  203. if name in self.manifest.archives:
  204. raise self.AlreadyExists(name)
  205. self.items_buffer.flush(flush=True)
  206. if timestamp is None:
  207. self.end = datetime.utcnow()
  208. start = self.start
  209. end = self.end
  210. else:
  211. self.end = timestamp
  212. start = timestamp
  213. end = timestamp # we only have 1 value
  214. metadata = StableDict({
  215. 'version': 1,
  216. 'name': name,
  217. 'items': self.items_buffer.chunks,
  218. 'cmdline': sys.argv,
  219. 'hostname': socket.gethostname(),
  220. 'username': getuser(),
  221. 'time': start.isoformat(),
  222. 'time_end': end.isoformat(),
  223. 'chunker_params': self.chunker_params,
  224. })
  225. data = msgpack.packb(metadata, unicode_errors='surrogateescape')
  226. self.id = self.key.id_hash(data)
  227. self.cache.add_chunk(self.id, data, self.stats)
  228. self.manifest.archives[name] = {'id': self.id, 'time': metadata['time']}
  229. self.manifest.write()
  230. self.repository.commit()
  231. self.cache.commit()
  232. def calc_stats(self, cache):
  233. def add(id):
  234. count, size, csize = cache.chunks[id]
  235. stats.update(size, csize, count == 1)
  236. cache.chunks[id] = count - 1, size, csize
  237. def add_file_chunks(chunks):
  238. for id, _, _ in chunks:
  239. add(id)
  240. # This function is a bit evil since it abuses the cache to calculate
  241. # the stats. The cache transaction must be rolled back afterwards
  242. unpacker = msgpack.Unpacker(use_list=False)
  243. cache.begin_txn()
  244. stats = Statistics()
  245. add(self.id)
  246. for id, chunk in zip(self.metadata[b'items'], self.repository.get_many(self.metadata[b'items'])):
  247. add(id)
  248. unpacker.feed(self.key.decrypt(id, chunk))
  249. for item in unpacker:
  250. if b'chunks' in item:
  251. stats.nfiles += 1
  252. add_file_chunks(item[b'chunks'])
  253. cache.rollback()
  254. return stats
  255. def extract_item(self, item, restore_attrs=True, dry_run=False, stdout=False, sparse=False):
  256. if dry_run or stdout:
  257. if b'chunks' in item:
  258. for data in self.pipeline.fetch_many([c[0] for c in item[b'chunks']], is_preloaded=True):
  259. if stdout:
  260. sys.stdout.buffer.write(data)
  261. if stdout:
  262. sys.stdout.buffer.flush()
  263. return
  264. dest = self.cwd
  265. if item[b'path'].startswith('/') or item[b'path'].startswith('..'):
  266. raise Exception('Path should be relative and local')
  267. path = os.path.join(dest, item[b'path'])
  268. # Attempt to remove existing files, ignore errors on failure
  269. try:
  270. st = os.lstat(path)
  271. if stat.S_ISDIR(st.st_mode):
  272. os.rmdir(path)
  273. else:
  274. os.unlink(path)
  275. except UnicodeEncodeError:
  276. raise self.IncompatibleFilesystemEncodingError(path, sys.getfilesystemencoding()) from None
  277. except OSError:
  278. pass
  279. mode = item[b'mode']
  280. if stat.S_ISREG(mode):
  281. if not os.path.exists(os.path.dirname(path)):
  282. os.makedirs(os.path.dirname(path))
  283. # Hard link?
  284. if b'source' in item:
  285. source = os.path.join(dest, item[b'source'])
  286. if os.path.exists(path):
  287. os.unlink(path)
  288. os.link(source, path)
  289. else:
  290. with open(path, 'wb') as fd:
  291. ids = [c[0] for c in item[b'chunks']]
  292. for data in self.pipeline.fetch_many(ids, is_preloaded=True):
  293. if sparse and self.zeros.startswith(data):
  294. # all-zero chunk: create a hole in a sparse file
  295. fd.seek(len(data), 1)
  296. else:
  297. fd.write(data)
  298. pos = fd.tell()
  299. fd.truncate(pos)
  300. fd.flush()
  301. self.restore_attrs(path, item, fd=fd.fileno())
  302. elif stat.S_ISDIR(mode):
  303. if not os.path.exists(path):
  304. os.makedirs(path)
  305. if restore_attrs:
  306. self.restore_attrs(path, item)
  307. elif stat.S_ISLNK(mode):
  308. if not os.path.exists(os.path.dirname(path)):
  309. os.makedirs(os.path.dirname(path))
  310. source = item[b'source']
  311. if os.path.exists(path):
  312. os.unlink(path)
  313. try:
  314. os.symlink(source, path)
  315. except UnicodeEncodeError:
  316. raise self.IncompatibleFilesystemEncodingError(source, sys.getfilesystemencoding()) from None
  317. self.restore_attrs(path, item, symlink=True)
  318. elif stat.S_ISFIFO(mode):
  319. if not os.path.exists(os.path.dirname(path)):
  320. os.makedirs(os.path.dirname(path))
  321. os.mkfifo(path)
  322. self.restore_attrs(path, item)
  323. elif stat.S_ISCHR(mode) or stat.S_ISBLK(mode):
  324. os.mknod(path, item[b'mode'], item[b'rdev'])
  325. self.restore_attrs(path, item)
  326. else:
  327. raise Exception('Unknown archive item type %r' % item[b'mode'])
  328. def restore_attrs(self, path, item, symlink=False, fd=None):
  329. xattrs = item.get(b'xattrs', {})
  330. for k, v in xattrs.items():
  331. try:
  332. xattr.setxattr(fd or path, k, v, follow_symlinks=False)
  333. except OSError as e:
  334. if e.errno not in (errno.ENOTSUP, errno.EACCES, ):
  335. # only raise if the errno is not on our ignore list:
  336. # ENOTSUP == xattrs not supported here
  337. # EACCES == permission denied to set this specific xattr
  338. # (this may happen related to security.* keys)
  339. raise
  340. uid = gid = None
  341. if not self.numeric_owner:
  342. uid = user2uid(item[b'user'])
  343. gid = group2gid(item[b'group'])
  344. uid = item[b'uid'] if uid is None else uid
  345. gid = item[b'gid'] if gid is None else gid
  346. # This code is a bit of a mess due to os specific differences
  347. try:
  348. if fd:
  349. os.fchown(fd, uid, gid)
  350. else:
  351. os.lchown(path, uid, gid)
  352. except OSError:
  353. pass
  354. if fd:
  355. os.fchmod(fd, item[b'mode'])
  356. elif not symlink:
  357. os.chmod(path, item[b'mode'])
  358. elif has_lchmod: # Not available on Linux
  359. os.lchmod(path, item[b'mode'])
  360. mtime = bigint_to_int(item[b'mtime'])
  361. if b'atime' in item:
  362. atime = bigint_to_int(item[b'atime'])
  363. else:
  364. # old archives only had mtime in item metadata
  365. atime = mtime
  366. if fd:
  367. os.utime(fd, None, ns=(atime, mtime))
  368. else:
  369. os.utime(path, None, ns=(atime, mtime), follow_symlinks=False)
  370. acl_set(path, item, self.numeric_owner)
  371. # Only available on OS X and FreeBSD
  372. if has_lchflags and b'bsdflags' in item:
  373. try:
  374. os.lchflags(path, item[b'bsdflags'])
  375. except OSError:
  376. pass
  377. def rename(self, name):
  378. if name in self.manifest.archives:
  379. raise self.AlreadyExists(name)
  380. metadata = StableDict(self._load_meta(self.id))
  381. metadata[b'name'] = name
  382. data = msgpack.packb(metadata, unicode_errors='surrogateescape')
  383. new_id = self.key.id_hash(data)
  384. self.cache.add_chunk(new_id, data, self.stats)
  385. self.manifest.archives[name] = {'id': new_id, 'time': metadata[b'time']}
  386. self.cache.chunk_decref(self.id, self.stats)
  387. del self.manifest.archives[self.name]
  388. def delete(self, stats, progress=False):
  389. unpacker = msgpack.Unpacker(use_list=False)
  390. items_ids = self.metadata[b'items']
  391. pi = ProgressIndicatorPercent(total=len(items_ids), msg="Decrementing references %3.0f%%", same_line=True)
  392. for (i, (items_id, data)) in enumerate(zip(items_ids, self.repository.get_many(items_ids))):
  393. if progress:
  394. pi.show(i)
  395. unpacker.feed(self.key.decrypt(items_id, data))
  396. self.cache.chunk_decref(items_id, stats)
  397. for item in unpacker:
  398. if b'chunks' in item:
  399. for chunk_id, size, csize in item[b'chunks']:
  400. self.cache.chunk_decref(chunk_id, stats)
  401. if progress:
  402. pi.finish()
  403. self.cache.chunk_decref(self.id, stats)
  404. del self.manifest.archives[self.name]
  405. def stat_attrs(self, st, path):
  406. item = {
  407. b'mode': st.st_mode,
  408. b'uid': st.st_uid, b'user': uid2user(st.st_uid),
  409. b'gid': st.st_gid, b'group': gid2group(st.st_gid),
  410. b'atime': int_to_bigint(st.st_atime_ns),
  411. b'ctime': int_to_bigint(st.st_ctime_ns),
  412. b'mtime': int_to_bigint(st.st_mtime_ns),
  413. }
  414. if self.numeric_owner:
  415. item[b'user'] = item[b'group'] = None
  416. xattrs = xattr.get_all(path, follow_symlinks=False)
  417. if xattrs:
  418. item[b'xattrs'] = StableDict(xattrs)
  419. if has_lchflags and st.st_flags:
  420. item[b'bsdflags'] = st.st_flags
  421. acl_get(path, item, st, self.numeric_owner)
  422. return item
  423. def process_dir(self, path, st):
  424. item = {b'path': make_path_safe(path)}
  425. item.update(self.stat_attrs(st, path))
  426. self.add_item(item)
  427. return 'd' # directory
  428. def process_fifo(self, path, st):
  429. item = {b'path': make_path_safe(path)}
  430. item.update(self.stat_attrs(st, path))
  431. self.add_item(item)
  432. return 'f' # fifo
  433. def process_dev(self, path, st):
  434. item = {b'path': make_path_safe(path), b'rdev': st.st_rdev}
  435. item.update(self.stat_attrs(st, path))
  436. self.add_item(item)
  437. if stat.S_ISCHR(st.st_mode):
  438. return 'c' # char device
  439. elif stat.S_ISBLK(st.st_mode):
  440. return 'b' # block device
  441. def process_symlink(self, path, st):
  442. source = os.readlink(path)
  443. item = {b'path': make_path_safe(path), b'source': source}
  444. item.update(self.stat_attrs(st, path))
  445. self.add_item(item)
  446. return 's' # symlink
  447. def process_stdin(self, path, cache):
  448. uid, gid = 0, 0
  449. fd = sys.stdin.buffer # binary
  450. chunks = []
  451. for chunk in self.chunker.chunkify(fd):
  452. chunks.append(cache.add_chunk(self.key.id_hash(chunk), chunk, self.stats))
  453. self.stats.nfiles += 1
  454. t = int_to_bigint(int(time.time()) * 1000000000)
  455. item = {
  456. b'path': path,
  457. b'chunks': chunks,
  458. b'mode': 0o100660, # regular file, ug=rw
  459. b'uid': uid, b'user': uid2user(uid),
  460. b'gid': gid, b'group': gid2group(gid),
  461. b'mtime': t, b'atime': t, b'ctime': t,
  462. }
  463. self.add_item(item)
  464. return 'i' # stdin
  465. def process_file(self, path, st, cache, ignore_inode=False):
  466. status = None
  467. safe_path = make_path_safe(path)
  468. # Is it a hard link?
  469. if st.st_nlink > 1:
  470. source = self.hard_links.get((st.st_ino, st.st_dev))
  471. if (st.st_ino, st.st_dev) in self.hard_links:
  472. item = self.stat_attrs(st, path)
  473. item.update({b'path': safe_path, b'source': source})
  474. self.add_item(item)
  475. status = 'h' # regular file, hardlink (to already seen inodes)
  476. return status
  477. else:
  478. self.hard_links[st.st_ino, st.st_dev] = safe_path
  479. path_hash = self.key.id_hash(os.path.join(self.cwd, path).encode('utf-8', 'surrogateescape'))
  480. first_run = not cache.files
  481. ids = cache.file_known_and_unchanged(path_hash, st, ignore_inode)
  482. if first_run:
  483. logger.info('processing files')
  484. chunks = None
  485. if ids is not None:
  486. # Make sure all ids are available
  487. for id_ in ids:
  488. if not cache.seen_chunk(id_):
  489. break
  490. else:
  491. chunks = [cache.chunk_incref(id_, self.stats) for id_ in ids]
  492. status = 'U' # regular file, unchanged
  493. else:
  494. status = 'A' # regular file, added
  495. item = {b'path': safe_path}
  496. # Only chunkify the file if needed
  497. if chunks is None:
  498. fh = Archive._open_rb(path)
  499. with os.fdopen(fh, 'rb') as fd:
  500. chunks = []
  501. for chunk in self.chunker.chunkify(fd, fh):
  502. chunks.append(cache.add_chunk(self.key.id_hash(chunk), chunk, self.stats))
  503. if self.show_progress:
  504. self.stats.show_progress(item=item, dt=0.2)
  505. cache.memorize_file(path_hash, st, [c[0] for c in chunks])
  506. status = status or 'M' # regular file, modified (if not 'A' already)
  507. item[b'chunks'] = chunks
  508. item.update(self.stat_attrs(st, path))
  509. self.stats.nfiles += 1
  510. self.add_item(item)
  511. return status
  512. @staticmethod
  513. def list_archives(repository, key, manifest, cache=None):
  514. # expensive! see also Manifest.list_archive_infos.
  515. for name, info in manifest.archives.items():
  516. yield Archive(repository, key, manifest, name, cache=cache)
  517. @staticmethod
  518. def _open_rb(path):
  519. try:
  520. # if we have O_NOATIME, this likely will succeed if we are root or owner of file:
  521. return os.open(path, flags_noatime)
  522. except PermissionError:
  523. if flags_noatime == flags_normal:
  524. # we do not have O_NOATIME, no need to try again:
  525. raise
  526. # Was this EPERM due to the O_NOATIME flag? Try again without it:
  527. return os.open(path, flags_normal)
  528. # this set must be kept complete, otherwise the RobustUnpacker might malfunction:
  529. ITEM_KEYS = set([b'path', b'source', b'rdev', b'chunks',
  530. b'mode', b'user', b'group', b'uid', b'gid', b'mtime', b'atime', b'ctime',
  531. b'xattrs', b'bsdflags', b'acl_nfs4', b'acl_access', b'acl_default', b'acl_extended', ])
  532. class RobustUnpacker:
  533. """A restartable/robust version of the streaming msgpack unpacker
  534. """
  535. def __init__(self, validator):
  536. super().__init__()
  537. self.item_keys = [msgpack.packb(name) for name in ITEM_KEYS]
  538. self.validator = validator
  539. self._buffered_data = []
  540. self._resync = False
  541. self._unpacker = msgpack.Unpacker(object_hook=StableDict)
  542. def resync(self):
  543. self._buffered_data = []
  544. self._resync = True
  545. def feed(self, data):
  546. if self._resync:
  547. self._buffered_data.append(data)
  548. else:
  549. self._unpacker.feed(data)
  550. def __iter__(self):
  551. return self
  552. def __next__(self):
  553. if self._resync:
  554. data = b''.join(self._buffered_data)
  555. while self._resync:
  556. if not data:
  557. raise StopIteration
  558. # Abort early if the data does not look like a serialized dict
  559. if len(data) < 2 or ((data[0] & 0xf0) != 0x80) or ((data[1] & 0xe0) != 0xa0):
  560. data = data[1:]
  561. continue
  562. # Make sure it looks like an item dict
  563. for pattern in self.item_keys:
  564. if data[1:].startswith(pattern):
  565. break
  566. else:
  567. data = data[1:]
  568. continue
  569. self._unpacker = msgpack.Unpacker(object_hook=StableDict)
  570. self._unpacker.feed(data)
  571. try:
  572. item = next(self._unpacker)
  573. if self.validator(item):
  574. self._resync = False
  575. return item
  576. # Ignore exceptions that might be raised when feeding
  577. # msgpack with invalid data
  578. except (TypeError, ValueError, StopIteration):
  579. pass
  580. data = data[1:]
  581. else:
  582. return next(self._unpacker)
  583. class ArchiveChecker:
  584. def __init__(self):
  585. self.error_found = False
  586. self.possibly_superseded = set()
  587. def check(self, repository, repair=False, archive=None, last=None, prefix=None, save_space=False):
  588. logger.info('Starting archive consistency check...')
  589. self.check_all = archive is None and last is None and prefix is None
  590. self.repair = repair
  591. self.repository = repository
  592. self.init_chunks()
  593. self.key = self.identify_key(repository)
  594. if Manifest.MANIFEST_ID not in self.chunks:
  595. logger.error("Repository manifest not found!")
  596. self.error_found = True
  597. self.manifest = self.rebuild_manifest()
  598. else:
  599. self.manifest, _ = Manifest.load(repository, key=self.key)
  600. self.rebuild_refcounts(archive=archive, last=last, prefix=prefix)
  601. self.orphan_chunks_check()
  602. self.finish(save_space=save_space)
  603. if self.error_found:
  604. logger.error('Archive consistency check complete, problems found.')
  605. else:
  606. logger.info('Archive consistency check complete, no problems found.')
  607. return self.repair or not self.error_found
  608. def init_chunks(self):
  609. """Fetch a list of all object keys from repository
  610. """
  611. # Explicitly set the initial hash table capacity to avoid performance issues
  612. # due to hash table "resonance"
  613. capacity = int(len(self.repository) * 1.2)
  614. self.chunks = ChunkIndex(capacity)
  615. marker = None
  616. while True:
  617. result = self.repository.list(limit=10000, marker=marker)
  618. if not result:
  619. break
  620. marker = result[-1]
  621. for id_ in result:
  622. self.chunks[id_] = (0, 0, 0)
  623. def identify_key(self, repository):
  624. cdata = repository.get(next(self.chunks.iteritems())[0])
  625. return key_factory(repository, cdata)
  626. def rebuild_manifest(self):
  627. """Rebuild the manifest object if it is missing
  628. Iterates through all objects in the repository looking for archive metadata blocks.
  629. """
  630. logger.info('Rebuilding missing manifest, this might take some time...')
  631. manifest = Manifest(self.key, self.repository)
  632. for chunk_id, _ in self.chunks.iteritems():
  633. cdata = self.repository.get(chunk_id)
  634. data = self.key.decrypt(chunk_id, cdata)
  635. # Some basic sanity checks of the payload before feeding it into msgpack
  636. if len(data) < 2 or ((data[0] & 0xf0) != 0x80) or ((data[1] & 0xe0) != 0xa0):
  637. continue
  638. if b'cmdline' not in data or b'\xa7version\x01' not in data:
  639. continue
  640. try:
  641. archive = msgpack.unpackb(data)
  642. # Ignore exceptions that might be raised when feeding
  643. # msgpack with invalid data
  644. except (TypeError, ValueError, StopIteration):
  645. continue
  646. if isinstance(archive, dict) and b'items' in archive and b'cmdline' in archive:
  647. logger.info('Found archive %s', archive[b'name'].decode('utf-8'))
  648. manifest.archives[archive[b'name'].decode('utf-8')] = {b'id': chunk_id, b'time': archive[b'time']}
  649. logger.info('Manifest rebuild complete.')
  650. return manifest
  651. def rebuild_refcounts(self, archive=None, last=None, prefix=None):
  652. """Rebuild object reference counts by walking the metadata
  653. Missing and/or incorrect data is repaired when detected
  654. """
  655. # Exclude the manifest from chunks
  656. del self.chunks[Manifest.MANIFEST_ID]
  657. def mark_as_possibly_superseded(id_):
  658. if self.chunks.get(id_, (0,))[0] == 0:
  659. self.possibly_superseded.add(id_)
  660. def add_callback(chunk):
  661. id_ = self.key.id_hash(chunk)
  662. cdata = self.key.encrypt(chunk)
  663. add_reference(id_, len(chunk), len(cdata), cdata)
  664. return id_
  665. def add_reference(id_, size, csize, cdata=None):
  666. try:
  667. count, _, _ = self.chunks[id_]
  668. self.chunks[id_] = count + 1, size, csize
  669. except KeyError:
  670. assert cdata is not None
  671. self.chunks[id_] = 1, size, csize
  672. if self.repair:
  673. self.repository.put(id_, cdata)
  674. def verify_file_chunks(item):
  675. """Verifies that all file chunks are present
  676. Missing file chunks will be replaced with new chunks of the same
  677. length containing all zeros.
  678. """
  679. offset = 0
  680. chunk_list = []
  681. for chunk_id, size, csize in item[b'chunks']:
  682. if chunk_id not in self.chunks:
  683. # If a file chunk is missing, create an all empty replacement chunk
  684. logger.error('{}: Missing file chunk detected (Byte {}-{})'.format(item[b'path'].decode('utf-8', 'surrogateescape'), offset, offset + size))
  685. self.error_found = True
  686. data = bytes(size)
  687. chunk_id = self.key.id_hash(data)
  688. cdata = self.key.encrypt(data)
  689. csize = len(cdata)
  690. add_reference(chunk_id, size, csize, cdata)
  691. else:
  692. add_reference(chunk_id, size, csize)
  693. chunk_list.append((chunk_id, size, csize))
  694. offset += size
  695. item[b'chunks'] = chunk_list
  696. def robust_iterator(archive):
  697. """Iterates through all archive items
  698. Missing item chunks will be skipped and the msgpack stream will be restarted
  699. """
  700. unpacker = RobustUnpacker(lambda item: isinstance(item, dict) and b'path' in item)
  701. _state = 0
  702. def missing_chunk_detector(chunk_id):
  703. nonlocal _state
  704. if _state % 2 != int(chunk_id not in self.chunks):
  705. _state += 1
  706. return _state
  707. def report(msg, chunk_id, chunk_no):
  708. cid = hexlify(chunk_id).decode('ascii')
  709. msg += ' [chunk: %06d_%s]' % (chunk_no, cid) # see debug-dump-archive-items
  710. self.error_found = True
  711. logger.error(msg)
  712. i = 0
  713. for state, items in groupby(archive[b'items'], missing_chunk_detector):
  714. items = list(items)
  715. if state % 2:
  716. for chunk_id in items:
  717. report('item metadata chunk missing', chunk_id, i)
  718. i += 1
  719. continue
  720. if state > 0:
  721. unpacker.resync()
  722. for chunk_id, cdata in zip(items, repository.get_many(items)):
  723. unpacker.feed(self.key.decrypt(chunk_id, cdata))
  724. try:
  725. for item in unpacker:
  726. if isinstance(item, dict):
  727. yield item
  728. else:
  729. report('Did not get expected metadata dict when unpacking item metadata', chunk_id, i)
  730. except Exception:
  731. report('Exception while unpacking item metadata', chunk_id, i)
  732. raise
  733. i += 1
  734. if archive is None:
  735. # we need last N or all archives
  736. archive_items = sorted(self.manifest.archives.items(), reverse=True,
  737. key=lambda name_info: name_info[1][b'time'])
  738. if prefix is not None:
  739. archive_items = [item for item in archive_items if item[0].startswith(prefix)]
  740. num_archives = len(archive_items)
  741. end = None if last is None else min(num_archives, last)
  742. else:
  743. # we only want one specific archive
  744. archive_items = [item for item in self.manifest.archives.items() if item[0] == archive]
  745. num_archives = 1
  746. end = 1
  747. with cache_if_remote(self.repository) as repository:
  748. for i, (name, info) in enumerate(archive_items[:end]):
  749. logger.info('Analyzing archive {} ({}/{})'.format(name, num_archives - i, num_archives))
  750. archive_id = info[b'id']
  751. if archive_id not in self.chunks:
  752. logger.error('Archive metadata block is missing!')
  753. self.error_found = True
  754. del self.manifest.archives[name]
  755. continue
  756. mark_as_possibly_superseded(archive_id)
  757. cdata = self.repository.get(archive_id)
  758. data = self.key.decrypt(archive_id, cdata)
  759. archive = StableDict(msgpack.unpackb(data))
  760. if archive[b'version'] != 1:
  761. raise Exception('Unknown archive metadata version')
  762. decode_dict(archive, (b'name', b'hostname', b'username', b'time', b'time_end'))
  763. archive[b'cmdline'] = [arg.decode('utf-8', 'surrogateescape') for arg in archive[b'cmdline']]
  764. items_buffer = ChunkBuffer(self.key)
  765. items_buffer.write_chunk = add_callback
  766. for item in robust_iterator(archive):
  767. if b'chunks' in item:
  768. verify_file_chunks(item)
  769. items_buffer.add(item)
  770. items_buffer.flush(flush=True)
  771. for previous_item_id in archive[b'items']:
  772. mark_as_possibly_superseded(previous_item_id)
  773. archive[b'items'] = items_buffer.chunks
  774. data = msgpack.packb(archive, unicode_errors='surrogateescape')
  775. new_archive_id = self.key.id_hash(data)
  776. cdata = self.key.encrypt(data)
  777. add_reference(new_archive_id, len(data), len(cdata), cdata)
  778. info[b'id'] = new_archive_id
  779. def orphan_chunks_check(self):
  780. if self.check_all:
  781. unused = set()
  782. for id_, (count, size, csize) in self.chunks.iteritems():
  783. if count == 0:
  784. unused.add(id_)
  785. orphaned = unused - self.possibly_superseded
  786. if orphaned:
  787. logger.error('{} orphaned objects found!'.format(len(orphaned)))
  788. self.error_found = True
  789. if self.repair:
  790. for id_ in unused:
  791. self.repository.delete(id_)
  792. else:
  793. logger.warning('Orphaned objects check skipped (needs all archives checked).')
  794. def finish(self, save_space=False):
  795. if self.repair:
  796. self.manifest.write()
  797. self.repository.commit(save_space=save_space)