archive.py 40 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014
  1. from datetime import datetime
  2. from getpass import getuser
  3. from itertools import groupby
  4. import errno
  5. import shutil
  6. import tempfile
  7. import threading
  8. from .key import key_factory
  9. from .remote import cache_if_remote
  10. import msgpack
  11. from multiprocessing import cpu_count
  12. import os
  13. import socket
  14. import stat
  15. import sys
  16. import time
  17. from io import BytesIO
  18. from . import xattr
  19. from .platform import acl_get, acl_set
  20. from .chunker import Chunker
  21. from .hashindex import ChunkIndex
  22. from .helpers import parse_timestamp, Error, uid2user, user2uid, gid2group, group2gid, \
  23. Manifest, Statistics, decode_dict, st_mtime_ns, make_path_safe, StableDict, int_to_bigint, bigint_to_int, \
  24. make_queue, TerminatedQueue
  25. ITEMS_BUFFER = 1024 * 1024
  26. CHUNK_MIN = 1024
  27. CHUNK_MAX = 10 * 1024 * 1024
  28. WINDOW_SIZE = 0xfff
  29. CHUNK_MASK = 0xffff
  30. ZEROS = b'\0' * CHUNK_MAX
  31. utime_supports_fd = os.utime in getattr(os, 'supports_fd', {})
  32. utime_supports_follow_symlinks = os.utime in getattr(os, 'supports_follow_symlinks', {})
  33. has_mtime_ns = sys.version >= '3.3'
  34. has_lchmod = hasattr(os, 'lchmod')
  35. has_lchflags = hasattr(os, 'lchflags')
  36. # Python <= 3.2 raises OSError instead of PermissionError (See #164)
  37. try:
  38. PermissionError = PermissionError
  39. except NameError:
  40. PermissionError = OSError
  41. class DownloadPipeline:
  42. def __init__(self, repository, key):
  43. self.repository = repository
  44. self.key = key
  45. def unpack_many(self, ids, filter=None, preload=False):
  46. unpacker = msgpack.Unpacker(use_list=False)
  47. for data in self.fetch_many(ids):
  48. unpacker.feed(data)
  49. items = [decode_dict(item, (b'path', b'source', b'user', b'group')) for item in unpacker]
  50. if filter:
  51. items = [item for item in items if filter(item)]
  52. if preload:
  53. for item in items:
  54. if b'chunks' in item:
  55. self.repository.preload([c[0] for c in item[b'chunks']])
  56. for item in items:
  57. yield item
  58. def fetch_many(self, ids, is_preloaded=False):
  59. for id_, data in zip(ids, self.repository.get_many(ids, is_preloaded=is_preloaded)):
  60. yield self.key.decrypt(id_, data)
  61. class ChunkBuffer:
  62. BUFFER_SIZE = 1 * 1024 * 1024
  63. def __init__(self, key):
  64. self.buffer = BytesIO()
  65. self.packer = msgpack.Packer(unicode_errors='surrogateescape')
  66. self.chunks = []
  67. self.key = key
  68. self.chunker = Chunker(WINDOW_SIZE, CHUNK_MASK, CHUNK_MIN, CHUNK_MAX,self.key.chunk_seed)
  69. def add(self, item):
  70. self.buffer.write(self.packer.pack(StableDict(item)))
  71. if self.is_full():
  72. self.flush()
  73. def write_chunk(self, chunk):
  74. raise NotImplementedError
  75. def flush(self, flush=False):
  76. if self.buffer.tell() == 0:
  77. return
  78. self.buffer.seek(0)
  79. chunks = list(bytes(s) for s in self.chunker.chunkify(self.buffer))
  80. self.buffer.seek(0)
  81. self.buffer.truncate(0)
  82. # Leave the last partial chunk in the buffer unless flush is True
  83. end = None if flush or len(chunks) == 1 else -1
  84. for chunk in chunks[:end]:
  85. self.chunks.append(self.write_chunk(chunk))
  86. if end == -1:
  87. self.buffer.write(chunks[-1])
  88. def is_full(self):
  89. return self.buffer.tell() > self.BUFFER_SIZE
  90. class CacheChunkBuffer(ChunkBuffer):
  91. def __init__(self, cache, key, stats):
  92. super(CacheChunkBuffer, self).__init__(key)
  93. self.cache = cache
  94. self.stats = stats
  95. def write_chunk(self, chunk):
  96. id_, _, _ = self.cache.add_chunk(self.key.id_hash(chunk), chunk, self.stats)
  97. return id_
  98. class ParallelProcessor:
  99. def __init__(self, archive, ncrypters=None):
  100. self.archive = archive
  101. if ncrypters is None:
  102. # note: cpu_count for 2 cores with HT is 4
  103. # put load on all logical cores and avoid idle cores
  104. ncrypters = cpu_count()
  105. self.ncrypters = ncrypters
  106. self.start_threads()
  107. def reader(self):
  108. while True:
  109. elem = self.reader_queue.get()
  110. if elem is None:
  111. self.reader_queue.task_done()
  112. break
  113. item = elem
  114. n = 0
  115. # Only chunkify the file if needed
  116. if b'chunks' in item and item[b'chunks'] is None:
  117. fd, fh = item.pop(b'fd', None), -1
  118. if fd is None:
  119. fh = Archive._open_rb(item.pop(b'path_name'), item[b'st'])
  120. fd = os.fdopen(fh, 'rb')
  121. with fd:
  122. for chunk in self.archive.chunker.chunkify(fd, fh):
  123. # important: chunk is a memoryview - make a copy or it will
  124. # have changed when we use it!
  125. chunk = bytes(chunk)
  126. self.crypter_queue.put((item, n, chunk))
  127. n += 1
  128. self.writer_queue.put((item, n, None, None, None, None)) # signal EOF via id == None , give number of chunks
  129. self.reader_queue.task_done()
  130. def crypter(self):
  131. while True:
  132. elem = self.crypter_queue.get()
  133. if elem is None:
  134. self.crypter_queue.task_done()
  135. break
  136. item, n, chunk = elem
  137. size = len(chunk)
  138. id = self.archive.key.id_hash(chunk)
  139. seen = self.archive.cache.seen_or_announce_chunk(id, size)
  140. if not seen:
  141. # we have never seen this id before, so we need to process it
  142. # TODO check if this creates duplicate IV/CTR values for AES
  143. cchunk = self.archive.key.encrypt(chunk)
  144. csize = len(cchunk)
  145. else:
  146. cchunk, csize = None, None
  147. self.writer_queue.put((item, n, cchunk, id, size, csize))
  148. self.crypter_queue.task_done()
  149. def writer(self):
  150. item_infos = {} # item path -> info dict
  151. size_infos = {} # chunk id -> sizes
  152. dying = False
  153. while True:
  154. elem = self.writer_queue.get()
  155. if elem is None:
  156. if not dying:
  157. # received poison from stop_threads, start dying,
  158. # but still do work the delayer thread might give us.
  159. dying = True
  160. # give poison to the delayer thread
  161. self.delayer_queue.put(None)
  162. self.writer_queue.task_done()
  163. continue
  164. else:
  165. # we received the final poison from the dying delayer
  166. self.writer_queue.task_done()
  167. # we are dead now
  168. break
  169. item, n, cchunk, id, size, csize = elem
  170. path = item[b'path']
  171. info = item_infos.setdefault(path, dict(count=None, chunks=[]))
  172. if id is None:
  173. if n is not None: # note: n == None is a retry
  174. # EOF signalled, n is the total count of chunks
  175. info['count'] = n
  176. else:
  177. size, csize, new_chunk = self.archive.cache.add_chunk_nostats(cchunk, id, size, csize)
  178. info['chunks'].append((n, id, new_chunk))
  179. if csize != 0:
  180. size_infos[id] = (size, csize)
  181. if len(info['chunks']) == info['count']:
  182. # we have processed all chunks or no chunks needed processing
  183. if b'chunks' in item:
  184. chunks = item[b'chunks']
  185. if chunks is None:
  186. # we want chunks, but we have no chunk id list yet, compute them
  187. try:
  188. chunks = self.archive.cache.postprocess_results(
  189. size_infos, info['chunks'], self.archive.stats)
  190. except self.archive.cache.ChunkSizeNotReady:
  191. # we looked up a chunk id, but do not have the size info yet. retry later.
  192. self.delayer_queue.put((item, None, None, None, None, None))
  193. self.writer_queue.task_done()
  194. continue
  195. else:
  196. # we have a chunk id list already, increase the ref counters, compute sizes
  197. chunks = [self.archive.cache.chunk_incref(id_, self.archive.stats) for id_ in chunks]
  198. item[b'chunks'] = chunks
  199. path_hash = item.pop(b'path_hash', None)
  200. if path_hash and chunks is not None: # a fs object (not stdin) and a regular file
  201. st = item.pop(b'st', None)
  202. self.archive.cache.memorize_file(path_hash, st, [c[0] for c in chunks])
  203. del item_infos[path]
  204. self.archive.stats.nfiles += 1
  205. self.archive.add_item(item)
  206. self.writer_queue.task_done()
  207. def delayer(self):
  208. # it is a pain that we need the compressed size for the chunks cache as it is not
  209. # available for duplicate chunks until the original chunk has finished processing.
  210. # this loop of (writer, delayer) with pipes connecting them is a hack to address
  211. # this, but it makes thread teardown complicated. Rather get rid of csize?
  212. while True:
  213. elem = self.delayer_queue.get()
  214. if elem is None:
  215. # we received poison from dying writer thread, kill the writer, too.
  216. self.writer_queue.put(None)
  217. self.delayer_queue.task_done()
  218. # we are dead now
  219. break
  220. time.sleep(0.001) # reschedule, avoid data circulating too fast
  221. self.writer_queue.put(elem)
  222. self.delayer_queue.task_done()
  223. def start_threads(self):
  224. def run_thread(func, name=None, daemon=False):
  225. t = threading.Thread(target=func, name=name)
  226. t.daemon = daemon
  227. t.start()
  228. return t
  229. # max. memory usage of a queue with chunk data is about queue_len * CHUNK_MAX
  230. queue_len = min(max(self.ncrypters, 4), 8)
  231. self.reader_queue = make_queue('reader', queue_len * 10) # small items (no chunk data)
  232. self.crypter_queue = make_queue('crypter', queue_len)
  233. self.writer_queue = make_queue('writer', queue_len)
  234. self.delayer_queue = make_queue('delay', queue_len)
  235. self.reader_thread = run_thread(self.reader, 'reader')
  236. self.crypter_threads = []
  237. for i in range(self.ncrypters):
  238. self.crypter_threads.append(run_thread(self.crypter, name='crypter-%d' % i))
  239. self.delayer_thread = run_thread(self.delayer, name='delayer')
  240. self.writer_thread = run_thread(self.writer, name='writer')
  241. def wait_finish(self):
  242. self.reader_queue.join()
  243. self.crypter_queue.join()
  244. self.writer_queue.join()
  245. self.delayer_queue.join()
  246. self.writer_queue.join()
  247. def stop_threads(self):
  248. count_before = threading.active_count()
  249. # for every thread:
  250. # put poison pill into its queue,
  251. # wait until queue is processed (and thread has terminated itself)
  252. # make queue unusable
  253. self.reader_queue.put(None)
  254. self.reader_queue.join()
  255. self.reader_thread.join()
  256. self.reader_queue = TerminatedQueue()
  257. for i in range(self.ncrypters):
  258. self.crypter_queue.put(None)
  259. self.crypter_queue.join()
  260. for t in self.crypter_threads:
  261. t.join()
  262. self.crypter_queue = TerminatedQueue()
  263. self.writer_queue.put(None) # the writer will poison the delayer first
  264. self.delayer_thread.join()
  265. self.delayer_queue = TerminatedQueue()
  266. self.writer_thread.join()
  267. self.writer_queue = TerminatedQueue()
  268. count_after = threading.active_count()
  269. assert count_before - 3 - self.ncrypters == count_after
  270. if count_after > 1:
  271. print('They are alive!')
  272. tl = [t.name for t in threading.enumerate()]
  273. tl.remove('MainThread')
  274. assert tl == []
  275. class Archive:
  276. class DoesNotExist(Error):
  277. """Archive {} does not exist"""
  278. class AlreadyExists(Error):
  279. """Archive {} already exists"""
  280. class IncompatibleFilesystemEncodingError(Error):
  281. """Failed to encode filename "{}" into file system encoding "{}". Consider configuring the LANG environment variable."""
  282. def __init__(self, repository, key, manifest, name, cache=None, create=False,
  283. checkpoint_interval=300, numeric_owner=False, progress=False):
  284. self.cwd = os.getcwd()
  285. self.key = key
  286. self.repository = repository
  287. self.cache = cache
  288. self.manifest = manifest
  289. self.hard_links = {}
  290. self.stats = Statistics()
  291. self.show_progress = progress
  292. self.last_progress = time.time()
  293. self.name = name
  294. self.checkpoint_interval = checkpoint_interval
  295. self.numeric_owner = numeric_owner
  296. self.pipeline = DownloadPipeline(self.repository, self.key)
  297. if create:
  298. self.pp = ParallelProcessor(self)
  299. self.items_buffer = CacheChunkBuffer(self.cache, self.key, self.stats)
  300. self.chunker = Chunker(WINDOW_SIZE, CHUNK_MASK, CHUNK_MIN, CHUNK_MAX, self.key.chunk_seed)
  301. if name in manifest.archives:
  302. raise self.AlreadyExists(name)
  303. self.last_checkpoint = time.time()
  304. i = 0
  305. while True:
  306. self.checkpoint_name = '%s.checkpoint%s' % (name, i and ('.%d' % i) or '')
  307. if self.checkpoint_name not in manifest.archives:
  308. break
  309. i += 1
  310. else:
  311. self.pp = None
  312. if name not in self.manifest.archives:
  313. raise self.DoesNotExist(name)
  314. info = self.manifest.archives[name]
  315. self.load(info[b'id'])
  316. def close(self):
  317. if self.pp:
  318. self.pp.stop_threads()
  319. def _load_meta(self, id):
  320. data = self.key.decrypt(id, self.repository.get(id))
  321. metadata = msgpack.unpackb(data)
  322. if metadata[b'version'] != 1:
  323. raise Exception('Unknown archive metadata version')
  324. return metadata
  325. def load(self, id):
  326. self.id = id
  327. self.metadata = self._load_meta(self.id)
  328. decode_dict(self.metadata, (b'name', b'hostname', b'username', b'time'))
  329. self.metadata[b'cmdline'] = [arg.decode('utf-8', 'surrogateescape') for arg in self.metadata[b'cmdline']]
  330. self.name = self.metadata[b'name']
  331. @property
  332. def ts(self):
  333. """Timestamp of archive creation in UTC"""
  334. return parse_timestamp(self.metadata[b'time'])
  335. def __repr__(self):
  336. return 'Archive(%r)' % self.name
  337. def iter_items(self, filter=None, preload=False):
  338. for item in self.pipeline.unpack_many(self.metadata[b'items'], filter=filter, preload=preload):
  339. yield item
  340. def add_item_queued(self, item):
  341. self.pp.reader_queue.put(item)
  342. def add_item(self, item):
  343. if self.show_progress and time.time() - self.last_progress > 0.2:
  344. self.stats.show_progress(item=item)
  345. self.last_progress = time.time()
  346. self.items_buffer.add(item)
  347. if time.time() - self.last_checkpoint > self.checkpoint_interval:
  348. self.write_checkpoint()
  349. self.last_checkpoint = time.time()
  350. def write_checkpoint(self):
  351. self.save(self.checkpoint_name)
  352. del self.manifest.archives[self.checkpoint_name]
  353. self.cache.chunk_decref(self.id, self.stats)
  354. def save(self, name=None, timestamp=None):
  355. self.pp.wait_finish()
  356. name = name or self.name
  357. if name in self.manifest.archives:
  358. raise self.AlreadyExists(name)
  359. self.items_buffer.flush(flush=True)
  360. if timestamp is None:
  361. timestamp = datetime.utcnow()
  362. metadata = StableDict({
  363. 'version': 1,
  364. 'name': name,
  365. 'items': self.items_buffer.chunks,
  366. 'cmdline': sys.argv,
  367. 'hostname': socket.gethostname(),
  368. 'username': getuser(),
  369. 'time': timestamp.isoformat(),
  370. })
  371. data = msgpack.packb(metadata, unicode_errors='surrogateescape')
  372. self.id = self.key.id_hash(data)
  373. self.cache.add_chunk(self.id, data, self.stats)
  374. self.manifest.archives[name] = {'id': self.id, 'time': metadata['time']}
  375. self.manifest.write()
  376. self.repository.commit()
  377. self.cache.commit()
  378. def calc_stats(self, cache):
  379. def add(id):
  380. count, size, csize = cache.chunks[id]
  381. stats.update(size, csize, count == 1)
  382. cache.chunks[id] = count - 1, size, csize
  383. def add_file_chunks(chunks):
  384. for id, _, _ in chunks:
  385. add(id)
  386. # This function is a bit evil since it abuses the cache to calculate
  387. # the stats. The cache transaction must be rolled back afterwards
  388. unpacker = msgpack.Unpacker(use_list=False)
  389. cache.begin_txn()
  390. stats = Statistics()
  391. add(self.id)
  392. for id, chunk in zip(self.metadata[b'items'], self.repository.get_many(self.metadata[b'items'])):
  393. add(id)
  394. unpacker.feed(self.key.decrypt(id, chunk))
  395. for item in unpacker:
  396. if b'chunks' in item:
  397. stats.nfiles += 1
  398. add_file_chunks(item[b'chunks'])
  399. cache.rollback()
  400. return stats
  401. def extract_item(self, item, restore_attrs=True, dry_run=False, stdout=False, sparse=False):
  402. if dry_run or stdout:
  403. if b'chunks' in item:
  404. for data in self.pipeline.fetch_many([c[0] for c in item[b'chunks']], is_preloaded=True):
  405. if stdout:
  406. sys.stdout.buffer.write(data)
  407. if stdout:
  408. sys.stdout.buffer.flush()
  409. return
  410. dest = self.cwd
  411. if item[b'path'].startswith('/') or item[b'path'].startswith('..'):
  412. raise Exception('Path should be relative and local')
  413. path = os.path.join(dest, item[b'path'])
  414. # Attempt to remove existing files, ignore errors on failure
  415. try:
  416. st = os.lstat(path)
  417. if stat.S_ISDIR(st.st_mode):
  418. os.rmdir(path)
  419. else:
  420. # XXX do not remove a regular file, it could be the "source"
  421. # of a hardlink - a still empty inode that needs to be filled.
  422. pass
  423. except UnicodeEncodeError:
  424. raise self.IncompatibleFilesystemEncodingError(path, sys.getfilesystemencoding())
  425. except OSError:
  426. pass
  427. mode = item[b'mode']
  428. if stat.S_ISDIR(mode):
  429. if not os.path.exists(path):
  430. os.makedirs(path)
  431. if restore_attrs:
  432. self.restore_attrs(path, item)
  433. elif stat.S_ISREG(mode):
  434. if not os.path.exists(os.path.dirname(path)):
  435. os.makedirs(os.path.dirname(path))
  436. # Hard link?
  437. if b'source' in item:
  438. source = os.path.join(dest, item[b'source'])
  439. if os.path.exists(path):
  440. os.unlink(path)
  441. if not os.path.exists(source):
  442. # due to multithreaded nature and different processing time,
  443. # the hardlink (without file content) often is in the archive
  444. # BEFORE the "source" file (with content).
  445. # we create an empty file that is filled with content when
  446. # the "source" item is extracted:
  447. with open(source, 'wb') as fd:
  448. pass
  449. os.link(source, path)
  450. else:
  451. with open(path, 'wb') as fd:
  452. ids = [c[0] for c in item[b'chunks']]
  453. for data in self.pipeline.fetch_many(ids, is_preloaded=True):
  454. if sparse and ZEROS.startswith(data):
  455. # all-zero chunk: create a hole in a sparse file
  456. fd.seek(len(data), 1)
  457. else:
  458. fd.write(data)
  459. pos = fd.tell()
  460. fd.truncate(pos)
  461. fd.flush()
  462. self.restore_attrs(path, item, fd=fd.fileno())
  463. elif stat.S_ISFIFO(mode):
  464. if not os.path.exists(os.path.dirname(path)):
  465. os.makedirs(os.path.dirname(path))
  466. os.mkfifo(path)
  467. self.restore_attrs(path, item)
  468. elif stat.S_ISLNK(mode):
  469. if not os.path.exists(os.path.dirname(path)):
  470. os.makedirs(os.path.dirname(path))
  471. source = item[b'source']
  472. if os.path.exists(path):
  473. os.unlink(path)
  474. os.symlink(source, path)
  475. self.restore_attrs(path, item, symlink=True)
  476. elif stat.S_ISCHR(mode) or stat.S_ISBLK(mode):
  477. os.mknod(path, item[b'mode'], item[b'rdev'])
  478. self.restore_attrs(path, item)
  479. else:
  480. raise Exception('Unknown archive item type %r' % item[b'mode'])
  481. def restore_attrs(self, path, item, symlink=False, fd=None):
  482. xattrs = item.get(b'xattrs')
  483. if xattrs:
  484. for k, v in xattrs.items():
  485. try:
  486. xattr.setxattr(fd or path, k, v, follow_symlinks=False)
  487. except OSError as e:
  488. if e.errno != errno.ENOTSUP:
  489. raise
  490. uid = gid = None
  491. if not self.numeric_owner:
  492. uid = user2uid(item[b'user'])
  493. gid = group2gid(item[b'group'])
  494. uid = item[b'uid'] if uid is None else uid
  495. gid = item[b'gid'] if gid is None else gid
  496. # This code is a bit of a mess due to os specific differences
  497. try:
  498. if fd:
  499. os.fchown(fd, uid, gid)
  500. else:
  501. os.lchown(path, uid, gid)
  502. except OSError:
  503. pass
  504. if fd:
  505. os.fchmod(fd, item[b'mode'])
  506. elif not symlink:
  507. os.chmod(path, item[b'mode'])
  508. elif has_lchmod: # Not available on Linux
  509. os.lchmod(path, item[b'mode'])
  510. mtime = bigint_to_int(item[b'mtime'])
  511. if fd and utime_supports_fd: # Python >= 3.3
  512. os.utime(fd, None, ns=(mtime, mtime))
  513. elif utime_supports_follow_symlinks: # Python >= 3.3
  514. os.utime(path, None, ns=(mtime, mtime), follow_symlinks=False)
  515. elif not symlink:
  516. os.utime(path, (mtime / 1e9, mtime / 1e9))
  517. acl_set(path, item, self.numeric_owner)
  518. # Only available on OS X and FreeBSD
  519. if has_lchflags and b'bsdflags' in item:
  520. try:
  521. os.lchflags(path, item[b'bsdflags'])
  522. except OSError:
  523. pass
  524. def rename(self, name):
  525. if name in self.manifest.archives:
  526. raise self.AlreadyExists(name)
  527. metadata = StableDict(self._load_meta(self.id))
  528. metadata[b'name'] = name
  529. data = msgpack.packb(metadata, unicode_errors='surrogateescape')
  530. new_id = self.key.id_hash(data)
  531. self.cache.add_chunk(new_id, data, self.stats)
  532. self.manifest.archives[name] = {'id': new_id, 'time': metadata[b'time']}
  533. self.cache.chunk_decref(self.id, self.stats)
  534. del self.manifest.archives[self.name]
  535. def delete(self, stats):
  536. unpacker = msgpack.Unpacker(use_list=False)
  537. for items_id, data in zip(self.metadata[b'items'], self.repository.get_many(self.metadata[b'items'])):
  538. unpacker.feed(self.key.decrypt(items_id, data))
  539. self.cache.chunk_decref(items_id, stats)
  540. for item in unpacker:
  541. if b'chunks' in item:
  542. for chunk_id, size, csize in item[b'chunks']:
  543. self.cache.chunk_decref(chunk_id, stats)
  544. self.cache.chunk_decref(self.id, stats)
  545. del self.manifest.archives[self.name]
  546. def stat_attrs(self, st, path):
  547. item = {
  548. b'mode': st.st_mode,
  549. b'uid': st.st_uid, b'user': uid2user(st.st_uid),
  550. b'gid': st.st_gid, b'group': gid2group(st.st_gid),
  551. b'mtime': int_to_bigint(st_mtime_ns(st))
  552. }
  553. if self.numeric_owner:
  554. item[b'user'] = item[b'group'] = None
  555. xattrs = xattr.get_all(path, follow_symlinks=False)
  556. if xattrs:
  557. item[b'xattrs'] = StableDict(xattrs)
  558. if has_lchflags and st.st_flags:
  559. item[b'bsdflags'] = st.st_flags
  560. acl_get(path, item, st, self.numeric_owner)
  561. return item
  562. def process_dir(self, path, st):
  563. item = {b'path': make_path_safe(path)}
  564. item.update(self.stat_attrs(st, path))
  565. self.add_item_queued(item)
  566. return 'd' # directory
  567. def process_fifo(self, path, st):
  568. item = {b'path': make_path_safe(path)}
  569. item.update(self.stat_attrs(st, path))
  570. self.add_item_queued(item)
  571. return 'f' # fifo
  572. def process_dev(self, path, st):
  573. item = {b'path': make_path_safe(path), b'rdev': st.st_rdev}
  574. item.update(self.stat_attrs(st, path))
  575. self.add_item_queued(item)
  576. if stat.S_ISCHR(st.st_mode):
  577. return 'c' # char device
  578. elif stat.S_ISBLK(st.st_mode):
  579. return 'b' # block device
  580. def process_symlink(self, path, st):
  581. source = os.readlink(path)
  582. item = {b'path': make_path_safe(path), b'source': source}
  583. item.update(self.stat_attrs(st, path))
  584. self.add_item_queued(item)
  585. return 's' # symlink
  586. def process_stdin(self, path, cache):
  587. uid, gid = 0, 0
  588. item = {
  589. b'path': path,
  590. b'fd': sys.stdin.buffer, # binary
  591. b'mode': 0o100660, # regular file, ug=rw
  592. b'uid': uid, b'user': uid2user(uid),
  593. b'gid': gid, b'group': gid2group(gid),
  594. b'mtime': int_to_bigint(int(time.time()) * 1000000000)
  595. }
  596. self.add_item_queued(item)
  597. return 'A'
  598. def process_file(self, path, st, cache):
  599. status = None
  600. safe_path = make_path_safe(path)
  601. # Is it a hard link?
  602. if st.st_nlink > 1:
  603. source = self.hard_links.get((st.st_ino, st.st_dev))
  604. if (st.st_ino, st.st_dev) in self.hard_links:
  605. item = self.stat_attrs(st, path)
  606. item.update({b'path': safe_path, b'source': source})
  607. self.add_item_queued(item)
  608. status = 'h' # regular file, hardlink (to already seen inodes)
  609. return status
  610. else:
  611. self.hard_links[st.st_ino, st.st_dev] = safe_path
  612. path_hash = self.key.id_hash(os.path.join(self.cwd, path).encode('utf-8', 'surrogateescape'))
  613. ids = cache.file_known_and_unchanged(path_hash, st)
  614. chunks = None
  615. if ids is not None:
  616. # Make sure all ids are available
  617. for id_ in ids:
  618. if not cache.seen_chunk(id_):
  619. break
  620. else:
  621. chunks = ids
  622. status = 'U' # regular file, unchanged
  623. else:
  624. status = 'A' # regular file, added
  625. if chunks is None:
  626. status = status or 'M' # regular file, modified (if not 'A' already)
  627. item = {
  628. b'path': safe_path,
  629. b'path_name': path,
  630. b'path_hash': path_hash,
  631. b'chunks': chunks,
  632. b'st': st,
  633. }
  634. item.update(self.stat_attrs(st, path))
  635. self.add_item_queued(item)
  636. return status
  637. @staticmethod
  638. def list_archives(repository, key, manifest, cache=None):
  639. for name, info in manifest.archives.items():
  640. yield Archive(repository, key, manifest, name, cache=cache)
  641. @staticmethod
  642. def _open_rb(path, st):
  643. flags_normal = os.O_RDONLY | getattr(os, 'O_BINARY', 0)
  644. flags_noatime = flags_normal | getattr(os, 'NO_ATIME', 0)
  645. euid = None
  646. def open_simple(p, s):
  647. return os.open(p, flags_normal)
  648. def open_noatime(p, s):
  649. return os.open(p, flags_noatime)
  650. def open_noatime_if_owner(p, s):
  651. if euid == 0 or s.st_uid == euid:
  652. # we are root or owner of file
  653. return open_noatime(p, s)
  654. else:
  655. return open_simple(p, s)
  656. def open_noatime_with_fallback(p, s):
  657. try:
  658. fd = os.open(p, flags_noatime)
  659. except PermissionError:
  660. # Was this EPERM due to the O_NOATIME flag?
  661. fd = os.open(p, flags_normal)
  662. # Yes, it was -- otherwise the above line would have thrown
  663. # another exception.
  664. nonlocal euid
  665. euid = os.geteuid()
  666. # So in future, let's check whether the file is owned by us
  667. # before attempting to use O_NOATIME.
  668. Archive._open_rb = open_noatime_if_owner
  669. return fd
  670. if flags_noatime != flags_normal:
  671. # Always use O_NOATIME version.
  672. Archive._open_rb = open_noatime_with_fallback
  673. else:
  674. # Always use non-O_NOATIME version.
  675. Archive._open_rb = open_simple
  676. return Archive._open_rb(path, st)
  677. class RobustUnpacker():
  678. """A restartable/robust version of the streaming msgpack unpacker
  679. """
  680. item_keys = [msgpack.packb(name) for name in ('path', 'mode', 'source', 'chunks', 'rdev', 'xattrs', 'user', 'group', 'uid', 'gid', 'mtime')]
  681. def __init__(self, validator):
  682. super(RobustUnpacker, self).__init__()
  683. self.validator = validator
  684. self._buffered_data = []
  685. self._resync = False
  686. self._unpacker = msgpack.Unpacker(object_hook=StableDict)
  687. def resync(self):
  688. self._buffered_data = []
  689. self._resync = True
  690. def feed(self, data):
  691. if self._resync:
  692. self._buffered_data.append(data)
  693. else:
  694. self._unpacker.feed(data)
  695. def __iter__(self):
  696. return self
  697. def __next__(self):
  698. if self._resync:
  699. data = b''.join(self._buffered_data)
  700. while self._resync:
  701. if not data:
  702. raise StopIteration
  703. # Abort early if the data does not look like a serialized dict
  704. if len(data) < 2 or ((data[0] & 0xf0) != 0x80) or ((data[1] & 0xe0) != 0xa0):
  705. data = data[1:]
  706. continue
  707. # Make sure it looks like an item dict
  708. for pattern in self.item_keys:
  709. if data[1:].startswith(pattern):
  710. break
  711. else:
  712. data = data[1:]
  713. continue
  714. self._unpacker = msgpack.Unpacker(object_hook=StableDict)
  715. self._unpacker.feed(data)
  716. try:
  717. item = next(self._unpacker)
  718. if self.validator(item):
  719. self._resync = False
  720. return item
  721. # Ignore exceptions that might be raised when feeding
  722. # msgpack with invalid data
  723. except (TypeError, ValueError, StopIteration):
  724. pass
  725. data = data[1:]
  726. else:
  727. return next(self._unpacker)
  728. class ArchiveChecker:
  729. def __init__(self):
  730. self.error_found = False
  731. self.possibly_superseded = set()
  732. self.tmpdir = tempfile.mkdtemp()
  733. def __del__(self):
  734. shutil.rmtree(self.tmpdir)
  735. def check(self, repository, repair=False, last=None):
  736. self.report_progress('Starting archive consistency check...')
  737. self.repair = repair
  738. self.repository = repository
  739. self.init_chunks()
  740. self.key = self.identify_key(repository)
  741. if Manifest.MANIFEST_ID not in self.chunks:
  742. self.manifest = self.rebuild_manifest()
  743. else:
  744. self.manifest, _ = Manifest.load(repository, key=self.key)
  745. self.rebuild_refcounts(last=last)
  746. if last is None:
  747. self.verify_chunks()
  748. else:
  749. self.report_progress('Orphaned objects check skipped (needs all archives checked)')
  750. if not self.error_found:
  751. self.report_progress('Archive consistency check complete, no problems found.')
  752. return self.repair or not self.error_found
  753. def init_chunks(self):
  754. """Fetch a list of all object keys from repository
  755. """
  756. # Explicity set the initial hash table capacity to avoid performance issues
  757. # due to hash table "resonance"
  758. capacity = int(len(self.repository) * 1.2)
  759. self.chunks = ChunkIndex(capacity)
  760. marker = None
  761. while True:
  762. result = self.repository.list(limit=10000, marker=marker)
  763. if not result:
  764. break
  765. marker = result[-1]
  766. for id_ in result:
  767. self.chunks[id_] = (0, 0, 0)
  768. def report_progress(self, msg, error=False):
  769. if error:
  770. self.error_found = True
  771. print(msg, file=sys.stderr if error else sys.stdout)
  772. def identify_key(self, repository):
  773. cdata = repository.get(next(self.chunks.iteritems())[0])
  774. return key_factory(repository, cdata)
  775. def rebuild_manifest(self):
  776. """Rebuild the manifest object if it is missing
  777. Iterates through all objects in the repository looking for archive metadata blocks.
  778. """
  779. self.report_progress('Rebuilding missing manifest, this might take some time...', error=True)
  780. manifest = Manifest(self.key, self.repository)
  781. for chunk_id, _ in self.chunks.iteritems():
  782. cdata = self.repository.get(chunk_id)
  783. data = self.key.decrypt(chunk_id, cdata)
  784. # Some basic sanity checks of the payload before feeding it into msgpack
  785. if len(data) < 2 or ((data[0] & 0xf0) != 0x80) or ((data[1] & 0xe0) != 0xa0):
  786. continue
  787. if b'cmdline' not in data or b'\xa7version\x01' not in data:
  788. continue
  789. try:
  790. archive = msgpack.unpackb(data)
  791. # Ignore exceptions that might be raised when feeding
  792. # msgpack with invalid data
  793. except (TypeError, ValueError, StopIteration):
  794. continue
  795. if isinstance(archive, dict) and b'items' in archive and b'cmdline' in archive:
  796. self.report_progress('Found archive ' + archive[b'name'].decode('utf-8'), error=True)
  797. manifest.archives[archive[b'name'].decode('utf-8')] = {b'id': chunk_id, b'time': archive[b'time']}
  798. self.report_progress('Manifest rebuild complete', error=True)
  799. return manifest
  800. def rebuild_refcounts(self, last=None):
  801. """Rebuild object reference counts by walking the metadata
  802. Missing and/or incorrect data is repaired when detected
  803. """
  804. # Exclude the manifest from chunks
  805. del self.chunks[Manifest.MANIFEST_ID]
  806. def mark_as_possibly_superseded(id_):
  807. if self.chunks.get(id_, (0,))[0] == 0:
  808. self.possibly_superseded.add(id_)
  809. def add_callback(chunk):
  810. id_ = self.key.id_hash(chunk)
  811. cdata = self.key.encrypt(chunk)
  812. add_reference(id_, len(chunk), len(cdata), cdata)
  813. return id_
  814. def add_reference(id_, size, csize, cdata=None):
  815. try:
  816. count, _, _ = self.chunks[id_]
  817. self.chunks[id_] = count + 1, size, csize
  818. except KeyError:
  819. assert cdata is not None
  820. self.chunks[id_] = 1, size, csize
  821. if self.repair:
  822. self.repository.put(id_, cdata)
  823. def verify_file_chunks(item):
  824. """Verifies that all file chunks are present
  825. Missing file chunks will be replaced with new chunks of the same
  826. length containing all zeros.
  827. """
  828. offset = 0
  829. chunk_list = []
  830. for chunk_id, size, csize in item[b'chunks']:
  831. if chunk_id not in self.chunks:
  832. # If a file chunk is missing, create an all empty replacement chunk
  833. self.report_progress('{}: Missing file chunk detected (Byte {}-{})'.format(item[b'path'].decode('utf-8', 'surrogateescape'), offset, offset + size), error=True)
  834. data = bytes(size)
  835. chunk_id = self.key.id_hash(data)
  836. cdata = self.key.encrypt(data)
  837. csize = len(cdata)
  838. add_reference(chunk_id, size, csize, cdata)
  839. else:
  840. add_reference(chunk_id, size, csize)
  841. chunk_list.append((chunk_id, size, csize))
  842. offset += size
  843. item[b'chunks'] = chunk_list
  844. def robust_iterator(archive):
  845. """Iterates through all archive items
  846. Missing item chunks will be skipped and the msgpack stream will be restarted
  847. """
  848. unpacker = RobustUnpacker(lambda item: isinstance(item, dict) and b'path' in item)
  849. _state = 0
  850. def missing_chunk_detector(chunk_id):
  851. nonlocal _state
  852. if _state % 2 != int(chunk_id not in self.chunks):
  853. _state += 1
  854. return _state
  855. for state, items in groupby(archive[b'items'], missing_chunk_detector):
  856. items = list(items)
  857. if state % 2:
  858. self.report_progress('Archive metadata damage detected', error=True)
  859. continue
  860. if state > 0:
  861. unpacker.resync()
  862. for chunk_id, cdata in zip(items, repository.get_many(items)):
  863. unpacker.feed(self.key.decrypt(chunk_id, cdata))
  864. for item in unpacker:
  865. yield item
  866. repository = cache_if_remote(self.repository)
  867. num_archives = len(self.manifest.archives)
  868. archive_items = sorted(self.manifest.archives.items(), reverse=True,
  869. key=lambda name_info: name_info[1][b'time'])
  870. end = None if last is None else min(num_archives, last)
  871. for i, (name, info) in enumerate(archive_items[:end]):
  872. self.report_progress('Analyzing archive {} ({}/{})'.format(name, num_archives - i, num_archives))
  873. archive_id = info[b'id']
  874. if archive_id not in self.chunks:
  875. self.report_progress('Archive metadata block is missing', error=True)
  876. del self.manifest.archives[name]
  877. continue
  878. mark_as_possibly_superseded(archive_id)
  879. cdata = self.repository.get(archive_id)
  880. data = self.key.decrypt(archive_id, cdata)
  881. archive = StableDict(msgpack.unpackb(data))
  882. if archive[b'version'] != 1:
  883. raise Exception('Unknown archive metadata version')
  884. decode_dict(archive, (b'name', b'hostname', b'username', b'time')) # fixme: argv
  885. items_buffer = ChunkBuffer(self.key)
  886. items_buffer.write_chunk = add_callback
  887. for item in robust_iterator(archive):
  888. if b'chunks' in item:
  889. verify_file_chunks(item)
  890. items_buffer.add(item)
  891. items_buffer.flush(flush=True)
  892. for previous_item_id in archive[b'items']:
  893. mark_as_possibly_superseded(previous_item_id)
  894. archive[b'items'] = items_buffer.chunks
  895. data = msgpack.packb(archive, unicode_errors='surrogateescape')
  896. new_archive_id = self.key.id_hash(data)
  897. cdata = self.key.encrypt(data)
  898. add_reference(new_archive_id, len(data), len(cdata), cdata)
  899. info[b'id'] = new_archive_id
  900. def verify_chunks(self):
  901. unused = set()
  902. for id_, (count, size, csize) in self.chunks.iteritems():
  903. if count == 0:
  904. unused.add(id_)
  905. orphaned = unused - self.possibly_superseded
  906. if orphaned:
  907. self.report_progress('{} orphaned objects found'.format(len(orphaned)), error=True)
  908. if self.repair:
  909. for id_ in unused:
  910. self.repository.delete(id_)
  911. self.manifest.write()
  912. self.repository.commit()