archive.py 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590
  1. from binascii import hexlify
  2. from datetime import datetime, timedelta, timezone
  3. from getpass import getuser
  4. from itertools import groupby
  5. import shutil
  6. import tempfile
  7. from attic.key import key_factory
  8. import msgpack
  9. import os
  10. import socket
  11. import stat
  12. import sys
  13. import time
  14. from io import BytesIO
  15. from attic import xattr
  16. from attic.chunker import chunkify
  17. from attic.hashindex import ChunkIndex
  18. from attic.helpers import Error, uid2user, user2uid, gid2group, group2gid, \
  19. Manifest, Statistics, decode_dict, st_mtime_ns, make_path_safe, StableDict
  20. ITEMS_BUFFER = 1024 * 1024
  21. CHUNK_MIN = 1024
  22. WINDOW_SIZE = 0xfff
  23. CHUNK_MASK = 0xffff
  24. utime_supports_fd = os.utime in getattr(os, 'supports_fd', {})
  25. has_mtime_ns = sys.version >= '3.3'
  26. has_lchmod = hasattr(os, 'lchmod')
  27. class DownloadPipeline:
  28. def __init__(self, repository, key):
  29. self.repository = repository
  30. self.key = key
  31. def unpack_many(self, ids, filter=None, preload=False):
  32. unpacker = msgpack.Unpacker(use_list=False)
  33. for data in self.fetch_many(ids):
  34. unpacker.feed(data)
  35. items = [decode_dict(item, (b'path', b'source', b'user', b'group')) for item in unpacker]
  36. if filter:
  37. items = [item for item in items if filter(item)]
  38. if preload:
  39. for item in items:
  40. if b'chunks' in item:
  41. self.repository.preload([c[0] for c in item[b'chunks']])
  42. for item in items:
  43. yield item
  44. def fetch_many(self, ids, is_preloaded=False):
  45. for id_, data in zip(ids, self.repository.get_many(ids, is_preloaded=is_preloaded)):
  46. yield self.key.decrypt(id_, data)
  47. class ChunkBuffer:
  48. BUFFER_SIZE = 1 * 1024 * 1024
  49. def __init__(self, key):
  50. self.buffer = BytesIO()
  51. self.packer = msgpack.Packer(unicode_errors='surrogateescape')
  52. self.chunks = []
  53. self.key = key
  54. def add(self, item):
  55. self.buffer.write(self.packer.pack(StableDict(item)))
  56. if self.is_full():
  57. self.flush()
  58. def write_chunk(self, chunk):
  59. raise NotImplementedError
  60. def flush(self, flush=False):
  61. if self.buffer.tell() == 0:
  62. return
  63. self.buffer.seek(0)
  64. chunks = list(bytes(s) for s in chunkify(self.buffer, WINDOW_SIZE, CHUNK_MASK, CHUNK_MIN, self.key.chunk_seed))
  65. self.buffer.seek(0)
  66. self.buffer.truncate(0)
  67. # Leave the last parital chunk in the buffer unless flush is True
  68. end = None if flush or len(chunks) == 1 else -1
  69. for chunk in chunks[:end]:
  70. self.chunks.append(self.write_chunk(chunk))
  71. if end == -1:
  72. self.buffer.write(chunks[-1])
  73. def is_full(self):
  74. return self.buffer.tell() > self.BUFFER_SIZE
  75. class CacheChunkBuffer(ChunkBuffer):
  76. def __init__(self, cache, key, stats):
  77. super(CacheChunkBuffer, self).__init__(key)
  78. self.cache = cache
  79. self.stats = stats
  80. def write_chunk(self, chunk):
  81. id_, _, _ = self.cache.add_chunk(self.key.id_hash(chunk), chunk, self.stats)
  82. return id_
  83. class Archive:
  84. class DoesNotExist(Error):
  85. """Archive {} does not exist"""
  86. class AlreadyExists(Error):
  87. """Archive {} already exists"""
  88. def __init__(self, repository, key, manifest, name, cache=None, create=False,
  89. checkpoint_interval=300, numeric_owner=False):
  90. self.cwd = os.getcwd()
  91. self.key = key
  92. self.repository = repository
  93. self.cache = cache
  94. self.manifest = manifest
  95. self.hard_links = {}
  96. self.stats = Statistics()
  97. self.name = name
  98. self.checkpoint_interval = checkpoint_interval
  99. self.numeric_owner = numeric_owner
  100. self.items_buffer = CacheChunkBuffer(self.cache, self.key, self.stats)
  101. self.pipeline = DownloadPipeline(self.repository, self.key)
  102. if create:
  103. if name in manifest.archives:
  104. raise self.AlreadyExists(name)
  105. self.last_checkpoint = time.time()
  106. i = 0
  107. while True:
  108. self.checkpoint_name = '%s.checkpoint%s' % (name, i and ('.%d' % i) or '')
  109. if not self.checkpoint_name in manifest.archives:
  110. break
  111. i += 1
  112. else:
  113. if name not in self.manifest.archives:
  114. raise self.DoesNotExist(name)
  115. info = self.manifest.archives[name]
  116. self.load(info[b'id'])
  117. def load(self, id):
  118. self.id = id
  119. data = self.key.decrypt(self.id, self.repository.get(self.id))
  120. self.metadata = msgpack.unpackb(data)
  121. if self.metadata[b'version'] != 1:
  122. raise Exception('Unknown archive metadata version')
  123. decode_dict(self.metadata, (b'name', b'hostname', b'username', b'time'))
  124. self.metadata[b'cmdline'] = [arg.decode('utf-8', 'surrogateescape') for arg in self.metadata[b'cmdline']]
  125. self.name = self.metadata[b'name']
  126. @property
  127. def ts(self):
  128. """Timestamp of archive creation in UTC"""
  129. t, f = self.metadata[b'time'].split('.', 1)
  130. return datetime.strptime(t, '%Y-%m-%dT%H:%M:%S').replace(tzinfo=timezone.utc) + timedelta(seconds=float('.' + f))
  131. def __repr__(self):
  132. return 'Archive(%r)' % self.name
  133. def iter_items(self, filter=None, preload=False):
  134. for item in self.pipeline.unpack_many(self.metadata[b'items'], filter=filter, preload=preload):
  135. yield item
  136. def add_item(self, item):
  137. self.items_buffer.add(item)
  138. now = time.time()
  139. if now - self.last_checkpoint > self.checkpoint_interval:
  140. self.last_checkpoint = now
  141. self.write_checkpoint()
  142. def write_checkpoint(self):
  143. self.save(self.checkpoint_name)
  144. del self.manifest.archives[self.checkpoint_name]
  145. self.cache.chunk_decref(self.id)
  146. def save(self, name=None):
  147. name = name or self.name
  148. if name in self.manifest.archives:
  149. raise self.AlreadyExists(name)
  150. self.items_buffer.flush(flush=True)
  151. metadata = StableDict({
  152. 'version': 1,
  153. 'name': name,
  154. 'items': self.items_buffer.chunks,
  155. 'cmdline': sys.argv,
  156. 'hostname': socket.gethostname(),
  157. 'username': getuser(),
  158. 'time': datetime.utcnow().isoformat(),
  159. })
  160. data = msgpack.packb(metadata, unicode_errors='surrogateescape')
  161. self.id = self.key.id_hash(data)
  162. self.cache.add_chunk(self.id, data, self.stats)
  163. self.manifest.archives[name] = {'id': self.id, 'time': metadata['time']}
  164. self.manifest.write()
  165. self.repository.commit()
  166. self.cache.commit()
  167. def calc_stats(self, cache):
  168. def add(id):
  169. count, size, csize = self.cache.chunks[id]
  170. stats.update(size, csize, count == 1)
  171. self.cache.chunks[id] = count - 1, size, csize
  172. def add_file_chunks(chunks):
  173. for id, _, _ in chunks:
  174. add(id)
  175. # This function is a bit evil since it abuses the cache to calculate
  176. # the stats. The cache transaction must be rolled back afterwards
  177. unpacker = msgpack.Unpacker(use_list=False)
  178. cache.begin_txn()
  179. stats = Statistics()
  180. add(self.id)
  181. for id, chunk in zip(self.metadata[b'items'], self.repository.get_many(self.metadata[b'items'])):
  182. add(id)
  183. unpacker.feed(self.key.decrypt(id, chunk))
  184. for item in unpacker:
  185. if b'chunks' in item:
  186. stats.nfiles += 1
  187. add_file_chunks(item[b'chunks'])
  188. cache.rollback()
  189. return stats
  190. def extract_item(self, item, restore_attrs=True, dry_run=False):
  191. if dry_run:
  192. if b'chunks' in item:
  193. for _ in self.pipeline.fetch_many([c[0] for c in item[b'chunks']], is_preloaded=True):
  194. pass
  195. return
  196. dest = self.cwd
  197. if item[b'path'].startswith('/') or item[b'path'].startswith('..'):
  198. raise Exception('Path should be relative and local')
  199. path = os.path.join(dest, item[b'path'])
  200. # Attempt to remove existing files, ignore errors on failure
  201. try:
  202. st = os.lstat(path)
  203. if stat.S_ISDIR(st.st_mode):
  204. os.rmdir(path)
  205. else:
  206. os.unlink(path)
  207. except OSError:
  208. pass
  209. mode = item[b'mode']
  210. if stat.S_ISDIR(mode):
  211. if not os.path.exists(path):
  212. os.makedirs(path)
  213. if restore_attrs:
  214. self.restore_attrs(path, item)
  215. elif stat.S_ISREG(mode):
  216. if not os.path.exists(os.path.dirname(path)):
  217. os.makedirs(os.path.dirname(path))
  218. # Hard link?
  219. if b'source' in item:
  220. source = os.path.join(dest, item[b'source'])
  221. if os.path.exists(path):
  222. os.unlink(path)
  223. os.link(source, path)
  224. else:
  225. with open(path, 'wb') as fd:
  226. ids = [c[0] for c in item[b'chunks']]
  227. for data in self.pipeline.fetch_many(ids, is_preloaded=True):
  228. fd.write(data)
  229. fd.flush()
  230. self.restore_attrs(path, item, fd=fd.fileno())
  231. elif stat.S_ISFIFO(mode):
  232. if not os.path.exists(os.path.dirname(path)):
  233. os.makedirs(os.path.dirname(path))
  234. os.mkfifo(path)
  235. self.restore_attrs(path, item)
  236. elif stat.S_ISLNK(mode):
  237. if not os.path.exists(os.path.dirname(path)):
  238. os.makedirs(os.path.dirname(path))
  239. source = item[b'source']
  240. if os.path.exists(path):
  241. os.unlink(path)
  242. os.symlink(source, path)
  243. self.restore_attrs(path, item, symlink=True)
  244. elif stat.S_ISCHR(mode) or stat.S_ISBLK(mode):
  245. os.mknod(path, item[b'mode'], item[b'rdev'])
  246. self.restore_attrs(path, item)
  247. else:
  248. raise Exception('Unknown archive item type %r' % item[b'mode'])
  249. def restore_attrs(self, path, item, symlink=False, fd=None):
  250. xattrs = item.get(b'xattrs')
  251. if xattrs:
  252. for k, v in xattrs.items():
  253. xattr.setxattr(fd or path, k, v)
  254. uid = gid = None
  255. if not self.numeric_owner:
  256. uid = user2uid(item[b'user'])
  257. gid = group2gid(item[b'group'])
  258. uid = uid or item[b'uid']
  259. gid = gid or item[b'gid']
  260. # This code is a bit of a mess due to os specific differences
  261. try:
  262. if fd:
  263. os.fchown(fd, uid, gid)
  264. else:
  265. os.lchown(path, uid, gid)
  266. except OSError:
  267. pass
  268. if fd:
  269. os.fchmod(fd, item[b'mode'])
  270. elif not symlink:
  271. os.chmod(path, item[b'mode'])
  272. elif has_lchmod: # Not available on Linux
  273. os.lchmod(path, item[b'mode'])
  274. if fd and utime_supports_fd: # Python >= 3.3
  275. os.utime(fd, None, ns=(item[b'mtime'], item[b'mtime']))
  276. elif utime_supports_fd: # Python >= 3.3
  277. os.utime(path, None, ns=(item[b'mtime'], item[b'mtime']), follow_symlinks=False)
  278. elif not symlink:
  279. os.utime(path, (item[b'mtime'] / 10**9, item[b'mtime'] / 10**9))
  280. def delete(self, cache):
  281. unpacker = msgpack.Unpacker(use_list=False)
  282. for id_, data in zip(self.metadata[b'items'], self.repository.get_many(self.metadata[b'items'])):
  283. unpacker.feed(self.key.decrypt(id_, data))
  284. self.cache.chunk_decref(id_)
  285. for item in unpacker:
  286. if b'chunks' in item:
  287. for chunk_id, size, csize in item[b'chunks']:
  288. self.cache.chunk_decref(chunk_id)
  289. self.cache.chunk_decref(self.id)
  290. del self.manifest.archives[self.name]
  291. self.manifest.write()
  292. self.repository.commit()
  293. cache.commit()
  294. def stat_attrs(self, st, path):
  295. item = {
  296. b'mode': st.st_mode,
  297. b'uid': st.st_uid, b'user': uid2user(st.st_uid),
  298. b'gid': st.st_gid, b'group': gid2group(st.st_gid),
  299. b'mtime': st_mtime_ns(st),
  300. }
  301. if self.numeric_owner:
  302. item[b'user'] = item[b'group'] = None
  303. xattrs = xattr.get_all(path, follow_symlinks=False)
  304. if xattrs:
  305. item[b'xattrs'] = StableDict(xattrs)
  306. return item
  307. def process_item(self, path, st):
  308. item = {b'path': make_path_safe(path)}
  309. item.update(self.stat_attrs(st, path))
  310. self.add_item(item)
  311. def process_dev(self, path, st):
  312. item = {b'path': make_path_safe(path), b'rdev': st.st_rdev}
  313. item.update(self.stat_attrs(st, path))
  314. self.add_item(item)
  315. def process_symlink(self, path, st):
  316. source = os.readlink(path)
  317. item = {b'path': make_path_safe(path), b'source': source}
  318. item.update(self.stat_attrs(st, path))
  319. self.add_item(item)
  320. def process_file(self, path, st, cache):
  321. safe_path = make_path_safe(path)
  322. # Is it a hard link?
  323. if st.st_nlink > 1:
  324. source = self.hard_links.get((st.st_ino, st.st_dev))
  325. if (st.st_ino, st.st_dev) in self.hard_links:
  326. item = self.stat_attrs(st, path)
  327. item.update({b'path': safe_path, b'source': source})
  328. self.add_item(item)
  329. return
  330. else:
  331. self.hard_links[st.st_ino, st.st_dev] = safe_path
  332. path_hash = self.key.id_hash(os.path.join(self.cwd, path).encode('utf-8', 'surrogateescape'))
  333. ids = cache.file_known_and_unchanged(path_hash, st)
  334. chunks = None
  335. if ids is not None:
  336. # Make sure all ids are available
  337. for id_ in ids:
  338. if not cache.seen_chunk(id_):
  339. break
  340. else:
  341. chunks = [cache.chunk_incref(id_, self.stats) for id_ in ids]
  342. # Only chunkify the file if needed
  343. if chunks is None:
  344. with open(path, 'rb') as fd:
  345. chunks = []
  346. for chunk in chunkify(fd, WINDOW_SIZE, CHUNK_MASK, CHUNK_MIN, self.key.chunk_seed):
  347. chunks.append(cache.add_chunk(self.key.id_hash(chunk), chunk, self.stats))
  348. cache.memorize_file(path_hash, st, [c[0] for c in chunks])
  349. item = {b'path': safe_path, b'chunks': chunks}
  350. item.update(self.stat_attrs(st, path))
  351. self.stats.nfiles += 1
  352. self.add_item(item)
  353. @staticmethod
  354. def list_archives(repository, key, manifest, cache=None):
  355. for name, info in manifest.archives.items():
  356. yield Archive(repository, key, manifest, name, cache=cache)
  357. class ArchiveChecker:
  358. def __init__(self):
  359. self.error_found = False
  360. self.progress = True
  361. self.possibly_superseded = set()
  362. self.tmpdir = tempfile.mkdtemp()
  363. def __del__(self):
  364. shutil.rmtree(self.tmpdir)
  365. def init_chunks(self):
  366. # Explicity set the initial hash table capacity to avoid performance issues
  367. # due to hash table "resonance"
  368. capacity = int(len(self.repository) * 1.2)
  369. self.chunks = ChunkIndex.create(os.path.join(self.tmpdir, 'chunks').encode('utf-8'), capacity=capacity)
  370. marker = None
  371. while True:
  372. result = self.repository.list(limit=10000, marker=marker)
  373. if not result:
  374. break
  375. marker = result[-1]
  376. for id_ in result:
  377. self.chunks[id_] = (0, 0, 0)
  378. def report_progress(self, msg, error=False):
  379. if error:
  380. self.error_found = True
  381. if error or self.progress:
  382. print(msg, file=sys.stderr)
  383. sys.stderr.flush()
  384. def identify_key(self, repository):
  385. cdata = repository.get(next(self.chunks.iteritems())[0])
  386. return key_factory(repository, cdata)
  387. def rebuild_manifest(self):
  388. self.report_progress('Rebuilding missing manifest, this might take some time...', error=True)
  389. manifest = Manifest(self.key, self.repository)
  390. for chunk_id, _ in self.chunks.iteritems():
  391. cdata = self.repository.get(chunk_id)
  392. data = self.key.decrypt(chunk_id, cdata)
  393. try:
  394. archive = msgpack.unpackb(data)
  395. except:
  396. continue
  397. if isinstance(archive, dict) and b'items' in archive and b'cmdline' in archive:
  398. self.report_progress('Found archive ' + archive[b'name'].decode('utf-8'), error=True)
  399. manifest.archives[archive[b'name'].decode('utf-8')] = {b'id': chunk_id, b'time': archive[b'time']}
  400. self.report_progress('Manifest rebuild complete', error=True)
  401. return manifest
  402. def check(self, repository, progress=True, repair=False):
  403. self.report_progress('Starting archive consistency check...')
  404. self.repair = repair
  405. self.progress = progress
  406. self.repository = repository
  407. self.init_chunks()
  408. self.key = self.identify_key(repository)
  409. if not Manifest.MANIFEST_ID in self.chunks:
  410. self.manifest = self.rebuild_manifest()
  411. else:
  412. self.manifest, _ = Manifest.load(repository, key=self.key)
  413. self.rebuild_chunks()
  414. self.verify_chunks()
  415. if not self.error_found:
  416. self.report_progress('Archive consistency check complete, no problems found.')
  417. return self.repair or not self.error_found
  418. def verify_chunks(self):
  419. unused = set()
  420. for id_, (count, size, csize) in self.chunks.iteritems():
  421. if count == 0:
  422. unused.add(id_)
  423. orphaned = unused - self.possibly_superseded
  424. if orphaned:
  425. self.report_progress('{} orphaned objects found'.format(len(orphaned)), error=True)
  426. if self.repair:
  427. for id_ in unused:
  428. self.repository.delete(id_)
  429. self.manifest.write()
  430. self.repository.commit()
  431. def rebuild_chunks(self):
  432. # Exclude the manifest from chunks
  433. del self.chunks[Manifest.MANIFEST_ID]
  434. def record_unused(id_):
  435. if self.chunks.get(id_, (0,))[0] == 0:
  436. self.possibly_superseded.add(id_)
  437. def add_callback(chunk):
  438. id_ = self.key.id_hash(chunk)
  439. cdata = self.key.encrypt(chunk)
  440. add_reference(id_, len(chunk), len(cdata), cdata)
  441. return id_
  442. def add_reference(id_, size, csize, cdata=None):
  443. try:
  444. count, _, _ = self.chunks[id_]
  445. self.chunks[id_] = count + 1, size, csize
  446. except KeyError:
  447. assert cdata is not None
  448. self.chunks[id_] = 1, size, csize
  449. if self.repair:
  450. self.repository.put(id_, cdata)
  451. def verify_file_chunks(item):
  452. offset = 0
  453. chunk_list = []
  454. for chunk_id, size, csize in item[b'chunks']:
  455. if not chunk_id in self.chunks:
  456. # If a file chunk is missing, create an all empty replacement chunk
  457. self.report_progress('{}: Missing file chunk detected (Byte {}-{})'.format(item[b'path'].decode('utf-8', 'surrogateescape'), offset, offset + size), error=True)
  458. data = bytes(size)
  459. chunk_id = self.key.id_hash(data)
  460. cdata = self.key.encrypt(data)
  461. csize = len(cdata)
  462. add_reference(chunk_id, size, csize, cdata)
  463. else:
  464. add_reference(chunk_id, size, csize)
  465. chunk_list.append((chunk_id, size, csize))
  466. offset += size
  467. item[b'chunks'] = chunk_list
  468. def msgpack_resync(data):
  469. data = memoryview(data)
  470. while data:
  471. unpacker = msgpack.Unpacker()
  472. unpacker.feed(data)
  473. item = next(unpacker)
  474. if isinstance(item, dict) and b'path' in item:
  475. return data
  476. data = data[1:]
  477. def robust_iterator(archive):
  478. prev_state = None
  479. state = 0
  480. def missing_chunk_detector(chunk_id):
  481. nonlocal state
  482. if state % 2 != int(not chunk_id in self.chunks):
  483. state += 1
  484. return state
  485. for state, items in groupby(archive[b'items'], missing_chunk_detector):
  486. if state != prev_state:
  487. unpacker = msgpack.Unpacker(object_hook=StableDict)
  488. prev_state = state
  489. if state % 2:
  490. self.report_progress('Archive metadata damage detected', error=True)
  491. return
  492. items = list(items)
  493. for i, (chunk_id, cdata) in enumerate(zip(items, self.repository.get_many(items))):
  494. data = self.key.decrypt(chunk_id, cdata)
  495. if state and i == 0:
  496. data = msgpack_resync(data)
  497. unpacker.feed(data)
  498. for item in unpacker:
  499. yield item
  500. num_archives = len(self.manifest.archives)
  501. for i, (name, info) in enumerate(list(self.manifest.archives.items()), 1):
  502. self.report_progress('Analyzing archive {} ({}/{})'.format(name, i, num_archives))
  503. archive_id = info[b'id']
  504. if not archive_id in self.chunks:
  505. self.report_progress('Archive metadata block is missing', error=True)
  506. del self.manifest.archives[name]
  507. continue
  508. items_buffer = ChunkBuffer(self.key)
  509. items_buffer.write_chunk = add_callback
  510. cdata = self.repository.get(archive_id)
  511. data = self.key.decrypt(archive_id, cdata)
  512. archive = StableDict(msgpack.unpackb(data))
  513. if archive[b'version'] != 1:
  514. raise Exception('Unknown archive metadata version')
  515. decode_dict(archive, (b'name', b'hostname', b'username', b'time')) # fixme: argv
  516. for item in robust_iterator(archive):
  517. if b'chunks' in item:
  518. verify_file_chunks(item)
  519. items_buffer.add(item)
  520. items_buffer.flush(flush=True)
  521. for previous_item_id in archive[b'items']:
  522. record_unused(previous_item_id)
  523. archive[b'items'] = items_buffer.chunks
  524. data = msgpack.packb(archive, unicode_errors='surrogateescape')
  525. new_archive_id = self.key.id_hash(data)
  526. cdata = self.key.encrypt(data)
  527. add_reference(new_archive_id, len(data), len(cdata), cdata)
  528. record_unused(archive_id)
  529. info[b'id'] = new_archive_id