2
0

archive.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340
  1. from __future__ import with_statement
  2. from datetime import datetime, timedelta
  3. from getpass import getuser
  4. import msgpack
  5. import os
  6. import socket
  7. import stat
  8. import sys
  9. from os.path import dirname
  10. from xattr import xattr, XATTR_NOFOLLOW
  11. from . import NS_ARCHIVE_METADATA, NS_CHUNK
  12. from ._speedups import chunkify
  13. from .helpers import uid2user, user2uid, gid2group, group2gid, IntegrityError
  14. CHUNK_SIZE = 64 * 1024
  15. WINDOW_SIZE = 4096
  16. have_lchmod = hasattr(os, 'lchmod')
  17. linux = sys.platform == 'linux2'
  18. class Archive(object):
  19. class DoesNotExist(Exception):
  20. pass
  21. def __init__(self, store, key, name=None, cache=None):
  22. self.key = key
  23. self.store = store
  24. self.cache = cache
  25. self.items = ''
  26. self.items_refs = []
  27. self.items_prefix = ''
  28. self.items_ids = []
  29. self.hard_links = {}
  30. if name:
  31. self.load(self.key.archive_hash(name))
  32. def load(self, id):
  33. self.id = id
  34. try:
  35. data, self.hash = self.key.decrypt(self.store.get(NS_ARCHIVE_METADATA, self.id))
  36. except self.store.DoesNotExist:
  37. raise self.DoesNotExist
  38. self.metadata = msgpack.unpackb(data)
  39. assert self.metadata['version'] == 1
  40. @property
  41. def ts(self):
  42. """Timestamp of archive creation in UTC"""
  43. t, f = self.metadata['time'].split('.', 1)
  44. return datetime.strptime(t, '%Y-%m-%dT%H:%M:%S') + timedelta(seconds=float('.' + f))
  45. def get_items(self):
  46. unpacker = msgpack.Unpacker()
  47. for id, size, csize in self.metadata['items']:
  48. data, items_hash = self.key.decrypt(self.store.get(NS_CHUNK, id))
  49. assert self.key.id_hash(data) == id
  50. unpacker.feed(data)
  51. for item in unpacker:
  52. yield item
  53. def add_item(self, item, refs=None):
  54. data = msgpack.packb(item)
  55. prefix = dirname(item['path'])
  56. if self.items_prefix and self.items_prefix != prefix:
  57. self.flush_items()
  58. if refs:
  59. self.items_refs += refs
  60. self.items += data
  61. self.items_prefix = prefix
  62. def flush_items(self):
  63. if not self.items:
  64. return
  65. id = self.key.id_hash(self.items)
  66. if self.cache.seen_chunk(id):
  67. self.items_ids.append(self.cache.chunk_incref(id))
  68. for id in self.items_refs:
  69. self.cache.chunk_decref(id)
  70. else:
  71. self.items_ids.append(self.cache.add_chunk(id, self.items))
  72. self.items = ''
  73. self.items_refs = []
  74. self.items_prefix = ''
  75. def save(self, name, cache):
  76. self.id = self.key.archive_hash(name)
  77. self.flush_items()
  78. metadata = {
  79. 'version': 1,
  80. 'name': name,
  81. 'items': self.items_ids,
  82. 'cmdline': sys.argv,
  83. 'hostname': socket.gethostname(),
  84. 'username': getuser(),
  85. 'time': datetime.utcnow().isoformat(),
  86. }
  87. data, self.hash = self.key.encrypt(msgpack.packb(metadata))
  88. self.store.put(NS_ARCHIVE_METADATA, self.id, data)
  89. self.store.commit()
  90. cache.commit()
  91. def stats(self, cache):
  92. # This function is a bit evil since it abuses the cache to calculate
  93. # the stats. The cache transaction must be rolled back afterwards
  94. def cb(chunk, error, (id, unique)):
  95. assert not error
  96. data, items_hash = self.key.decrypt(chunk)
  97. assert self.key.id_hash(data) == id
  98. unpacker.feed(data)
  99. for item in unpacker:
  100. try:
  101. for id, size, csize in item['chunks']:
  102. count, _, _ = self.cache.chunks[id]
  103. stats['osize'] += size
  104. stats['csize'] += csize
  105. if unique and count == 1:
  106. stats['usize'] += csize
  107. self.cache.chunks[id] = count - 1, size, csize
  108. except KeyError:
  109. pass
  110. unpacker = msgpack.Unpacker()
  111. cache.begin_txn()
  112. stats = {'osize': 0, 'csize': 0, 'usize': 0}
  113. for id, size, csize in self.metadata['items']:
  114. stats['osize'] += size
  115. stats['csize'] += csize
  116. unique = self.cache.seen_chunk(id) == 1
  117. if unique:
  118. stats['usize'] += csize
  119. self.store.get(NS_CHUNK, id, callback=cb, callback_data=(id, unique))
  120. self.cache.chunk_decref(id)
  121. self.store.flush_rpc()
  122. cache.rollback()
  123. return stats
  124. def extract_item(self, item, dest=None, start_cb=None):
  125. dest = dest or os.getcwdu()
  126. dir_stat_queue = []
  127. assert item['path'][0] not in ('/', '\\', ':')
  128. path = os.path.join(dest, item['path'].decode('utf-8'))
  129. mode = item['mode']
  130. if stat.S_ISDIR(mode):
  131. if not os.path.exists(path):
  132. os.makedirs(path)
  133. self.restore_attrs(path, item)
  134. elif stat.S_ISFIFO(mode):
  135. if not os.path.exists(os.path.dirname(path)):
  136. os.makedirs(os.path.dirname(path))
  137. os.mkfifo(path)
  138. self.restore_attrs(path, item)
  139. elif stat.S_ISLNK(mode):
  140. if not os.path.exists(os.path.dirname(path)):
  141. os.makedirs(os.path.dirname(path))
  142. source = item['source']
  143. if os.path.exists(path):
  144. os.unlink(path)
  145. os.symlink(source, path)
  146. self.restore_attrs(path, item, symlink=True)
  147. elif stat.S_ISREG(mode):
  148. if not os.path.exists(os.path.dirname(path)):
  149. os.makedirs(os.path.dirname(path))
  150. # Hard link?
  151. if 'source' in item:
  152. source = os.path.join(dest, item['source'].decode('utf-8'))
  153. if os.path.exists(path):
  154. os.unlink(path)
  155. os.link(source, path)
  156. else:
  157. def extract_cb(chunk, error, (id, i, last)):
  158. if i==0:
  159. start_cb(item)
  160. assert not error
  161. data, hash = self.key.decrypt(chunk)
  162. if self.key.id_hash(data) != id:
  163. raise IntegrityError('chunk hash did not match')
  164. fd.write(data)
  165. if last:
  166. fd.close()
  167. self.restore_attrs(path, item)
  168. fd = open(path, 'wb')
  169. n = len(item['chunks'])
  170. if n == 0:
  171. start_cb(item)
  172. self.restore_attrs(path, item)
  173. fd.close()
  174. else:
  175. for i, (id, size, csize) in enumerate(item['chunks']):
  176. self.store.get(NS_CHUNK, id, callback=extract_cb, callback_data=(id, i, i==n-1))
  177. else:
  178. raise Exception('Unknown archive item type %r' % item['mode'])
  179. def restore_attrs(self, path, item, symlink=False):
  180. xattrs = item.get('xattrs')
  181. if xattrs:
  182. xa = xattr(path, XATTR_NOFOLLOW)
  183. for k, v in xattrs.items():
  184. try:
  185. xa.set(k, v)
  186. except KeyError:
  187. pass
  188. if have_lchmod:
  189. os.lchmod(path, item['mode'])
  190. elif not symlink:
  191. os.chmod(path, item['mode'])
  192. uid = user2uid(item['user']) or item['uid']
  193. gid = group2gid(item['group']) or item['gid']
  194. try:
  195. os.lchown(path, uid, gid)
  196. except OSError:
  197. pass
  198. if not symlink:
  199. # FIXME: We should really call futimes here (c extension required)
  200. os.utime(path, (item['mtime'], item['mtime']))
  201. def verify_file(self, item, start, result):
  202. def verify_chunk(chunk, error, (id, i, last)):
  203. if i == 0:
  204. start(item)
  205. assert not error
  206. data, hash = self.key.decrypt(chunk)
  207. if self.key.id_hash(data) != id:
  208. result(item, False)
  209. elif last:
  210. result(item, True)
  211. n = len(item['chunks'])
  212. if n == 0:
  213. start(item)
  214. result(item, True)
  215. else:
  216. for i, (id, size, csize) in enumerate(item['chunks']):
  217. self.store.get(NS_CHUNK, id, callback=verify_chunk, callback_data=(id, i, i==n-1))
  218. def delete(self, cache):
  219. def cb(chunk, error, id):
  220. assert not error
  221. data, items_hash = self.key.decrypt(chunk)
  222. assert self.key.id_hash(data) == id
  223. unpacker.feed(data)
  224. for item in unpacker:
  225. try:
  226. for chunk_id, size, csize in item['chunks']:
  227. self.cache.chunk_decref(chunk_id)
  228. except KeyError:
  229. pass
  230. self.cache.chunk_decref(id)
  231. unpacker = msgpack.Unpacker()
  232. for id, size, csize in self.metadata['items']:
  233. if self.cache.seen_chunk(id) == 1:
  234. self.store.get(NS_CHUNK, id, callback=cb, callback_data=id)
  235. else:
  236. self.cache.chunk_decref(id)
  237. self.store.flush_rpc()
  238. self.store.delete(NS_ARCHIVE_METADATA, self.id)
  239. self.store.commit()
  240. cache.commit()
  241. def stat_attrs(self, st, path):
  242. item = {
  243. 'mode': st.st_mode,
  244. 'uid': st.st_uid, 'user': uid2user(st.st_uid),
  245. 'gid': st.st_gid, 'group': gid2group(st.st_gid),
  246. 'mtime': st.st_mtime,
  247. }
  248. try:
  249. xa = xattr(path, XATTR_NOFOLLOW)
  250. xattrs = {}
  251. for key in xa:
  252. # Only store the user namespace on Linux
  253. if linux and not key.startswith('user'):
  254. continue
  255. xattrs[key] = xa[key]
  256. if xattrs:
  257. item['xattrs'] = xattrs
  258. except IOError:
  259. pass
  260. return item
  261. def process_dir(self, path, st):
  262. item = {'path': path.lstrip('/\\:')}
  263. item.update(self.stat_attrs(st, path))
  264. self.add_item(item)
  265. def process_fifo(self, path, st):
  266. item = {'path': path.lstrip('/\\:')}
  267. item.update(self.stat_attrs(st, path))
  268. self.add_item(item)
  269. def process_symlink(self, path, st):
  270. source = os.readlink(path)
  271. item = {'path': path.lstrip('/\\:'), 'source': source}
  272. item.update(self.stat_attrs(st, path))
  273. self.add_item(item)
  274. def process_file(self, path, st, cache):
  275. safe_path = path.lstrip('/\\:')
  276. # Is it a hard link?
  277. if st.st_nlink > 1:
  278. source = self.hard_links.get((st.st_ino, st.st_dev))
  279. if (st.st_ino, st.st_dev) in self.hard_links:
  280. item = self.stat_attrs(st, path)
  281. item.update({'path': safe_path, 'source': source})
  282. self.add_item(item)
  283. return
  284. else:
  285. self.hard_links[st.st_ino, st.st_dev] = safe_path
  286. path_hash = self.key.id_hash(path.encode('utf-8'))
  287. ids = cache.file_known_and_unchanged(path_hash, st)
  288. chunks = None
  289. if ids is not None:
  290. # Make sure all ids are available
  291. for id in ids:
  292. if not cache.seen_chunk(id):
  293. break
  294. else:
  295. chunks = [cache.chunk_incref(id) for id in ids]
  296. # Only chunkify the file if needed
  297. if chunks is None:
  298. with open(path, 'rb') as fd:
  299. chunks = []
  300. for chunk in chunkify(fd, CHUNK_SIZE, WINDOW_SIZE,
  301. self.key.chunk_seed):
  302. chunks.append(cache.add_chunk(self.key.id_hash(chunk), chunk))
  303. ids = [id for id, _, _ in chunks]
  304. cache.memorize_file(path_hash, st, ids)
  305. item = {'path': safe_path, 'chunks': chunks}
  306. item.update(self.stat_attrs(st, path))
  307. self.add_item(item, ids)
  308. @staticmethod
  309. def list_archives(store, key):
  310. for id in list(store.list(NS_ARCHIVE_METADATA)):
  311. archive = Archive(store, key)
  312. archive.load(id)
  313. yield archive