archive.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346
  1. from __future__ import with_statement
  2. from datetime import datetime, timedelta
  3. from getpass import getuser
  4. import msgpack
  5. import os
  6. import socket
  7. import stat
  8. import sys
  9. from zlib import crc32
  10. from xattr import xattr, XATTR_NOFOLLOW
  11. from . import NS_ARCHIVE_METADATA, NS_CHUNK
  12. from ._speedups import chunkify
  13. from .helpers import uid2user, user2uid, gid2group, group2gid, IntegrityError, Counter
  14. CHUNK_SIZE = 64 * 1024
  15. WINDOW_SIZE = 4096
  16. have_lchmod = hasattr(os, 'lchmod')
  17. linux = sys.platform == 'linux2'
  18. class Archive(object):
  19. class DoesNotExist(Exception):
  20. pass
  21. def __init__(self, store, key, name=None, cache=None):
  22. self.key = key
  23. self.store = store
  24. self.cache = cache
  25. self.items = ''
  26. self.items_refs = []
  27. self.items_ids = []
  28. self.hard_links = {}
  29. if name:
  30. self.load(self.key.archive_hash(name))
  31. def load(self, id):
  32. self.id = id
  33. try:
  34. data, self.hash = self.key.decrypt(self.store.get(NS_ARCHIVE_METADATA, self.id))
  35. except self.store.DoesNotExist:
  36. raise self.DoesNotExist
  37. self.metadata = msgpack.unpackb(data)
  38. assert self.metadata['version'] == 1
  39. @property
  40. def ts(self):
  41. """Timestamp of archive creation in UTC"""
  42. t, f = self.metadata['time'].split('.', 1)
  43. return datetime.strptime(t, '%Y-%m-%dT%H:%M:%S') + timedelta(seconds=float('.' + f))
  44. def iter_items(self, callback):
  45. unpacker = msgpack.Unpacker()
  46. counter = Counter(0)
  47. def cb(chunk, error, id):
  48. counter.dec()
  49. print len(chunk)
  50. data, items_hash = self.key.decrypt(chunk)
  51. assert self.key.id_hash(data) == id
  52. unpacker.feed(data)
  53. for item in unpacker:
  54. callback(item)
  55. for id, size, csize in self.metadata['items']:
  56. # Limit the number of concurrent items requests to 3
  57. self.store.flush_rpc(counter, 10)
  58. counter.inc()
  59. self.store.get(NS_CHUNK, id, callback=cb, callback_data=id)
  60. def add_item(self, item, refs=None):
  61. data = msgpack.packb(item)
  62. if crc32(item['path'].encode('utf-8')) % 1000 == 0:
  63. self.flush_items()
  64. if refs:
  65. self.items_refs += refs
  66. self.items += data
  67. def flush_items(self):
  68. if not self.items:
  69. return
  70. print 'flush', len(self.items)
  71. id = self.key.id_hash(self.items)
  72. if self.cache.seen_chunk(id):
  73. self.items_ids.append(self.cache.chunk_incref(id))
  74. for id in self.items_refs:
  75. self.cache.chunk_decref(id)
  76. else:
  77. self.items_ids.append(self.cache.add_chunk(id, self.items))
  78. self.items = ''
  79. self.items_refs = []
  80. def save(self, name, cache):
  81. self.id = self.key.archive_hash(name)
  82. self.flush_items()
  83. metadata = {
  84. 'version': 1,
  85. 'name': name,
  86. 'items': self.items_ids,
  87. 'cmdline': sys.argv,
  88. 'hostname': socket.gethostname(),
  89. 'username': getuser(),
  90. 'time': datetime.utcnow().isoformat(),
  91. }
  92. data, self.hash = self.key.encrypt(msgpack.packb(metadata))
  93. self.store.put(NS_ARCHIVE_METADATA, self.id, data)
  94. self.store.commit()
  95. cache.commit()
  96. def stats(self, cache):
  97. # This function is a bit evil since it abuses the cache to calculate
  98. # the stats. The cache transaction must be rolled back afterwards
  99. def cb(chunk, error, (id, unique)):
  100. assert not error
  101. data, items_hash = self.key.decrypt(chunk)
  102. assert self.key.id_hash(data) == id
  103. unpacker.feed(data)
  104. for item in unpacker:
  105. try:
  106. for id, size, csize in item['chunks']:
  107. count, _, _ = self.cache.chunks[id]
  108. stats['osize'] += size
  109. stats['csize'] += csize
  110. if unique and count == 1:
  111. stats['usize'] += csize
  112. self.cache.chunks[id] = count - 1, size, csize
  113. except KeyError:
  114. pass
  115. unpacker = msgpack.Unpacker()
  116. cache.begin_txn()
  117. stats = {'osize': 0, 'csize': 0, 'usize': 0}
  118. for id, size, csize in self.metadata['items']:
  119. stats['osize'] += size
  120. stats['csize'] += csize
  121. unique = self.cache.seen_chunk(id) == 1
  122. if unique:
  123. stats['usize'] += csize
  124. self.store.get(NS_CHUNK, id, callback=cb, callback_data=(id, unique))
  125. self.cache.chunk_decref(id)
  126. self.store.flush_rpc()
  127. cache.rollback()
  128. return stats
  129. def extract_item(self, item, dest=None, start_cb=None):
  130. dest = dest or os.getcwdu()
  131. dir_stat_queue = []
  132. assert item['path'][0] not in ('/', '\\', ':')
  133. path = os.path.join(dest, item['path'].decode('utf-8'))
  134. mode = item['mode']
  135. if stat.S_ISDIR(mode):
  136. if not os.path.exists(path):
  137. os.makedirs(path)
  138. self.restore_attrs(path, item)
  139. elif stat.S_ISFIFO(mode):
  140. if not os.path.exists(os.path.dirname(path)):
  141. os.makedirs(os.path.dirname(path))
  142. os.mkfifo(path)
  143. self.restore_attrs(path, item)
  144. elif stat.S_ISLNK(mode):
  145. if not os.path.exists(os.path.dirname(path)):
  146. os.makedirs(os.path.dirname(path))
  147. source = item['source']
  148. if os.path.exists(path):
  149. os.unlink(path)
  150. os.symlink(source, path)
  151. self.restore_attrs(path, item, symlink=True)
  152. elif stat.S_ISREG(mode):
  153. if not os.path.exists(os.path.dirname(path)):
  154. os.makedirs(os.path.dirname(path))
  155. # Hard link?
  156. if 'source' in item:
  157. source = os.path.join(dest, item['source'].decode('utf-8'))
  158. if os.path.exists(path):
  159. os.unlink(path)
  160. os.link(source, path)
  161. else:
  162. def extract_cb(chunk, error, (id, i)):
  163. if i == 0:
  164. state['fd'] = open(path, 'wb')
  165. start_cb(item)
  166. assert not error
  167. data, hash = self.key.decrypt(chunk)
  168. if self.key.id_hash(data) != id:
  169. raise IntegrityError('chunk hash did not match')
  170. state['fd'].write(data)
  171. if i == n - 1:
  172. state['fd'].close()
  173. self.restore_attrs(path, item)
  174. state = {}
  175. n = len(item['chunks'])
  176. ## 0 chunks indicates an empty (0 bytes) file
  177. if n == 0:
  178. open(path, 'wb').close()
  179. start_cb(item)
  180. self.restore_attrs(path, item)
  181. else:
  182. for i, (id, size, csize) in enumerate(item['chunks']):
  183. self.store.get(NS_CHUNK, id, callback=extract_cb, callback_data=(id, i))
  184. else:
  185. raise Exception('Unknown archive item type %r' % item['mode'])
  186. def restore_attrs(self, path, item, symlink=False):
  187. xattrs = item.get('xattrs')
  188. if xattrs:
  189. xa = xattr(path, XATTR_NOFOLLOW)
  190. for k, v in xattrs.items():
  191. try:
  192. xa.set(k, v)
  193. except KeyError:
  194. pass
  195. if have_lchmod:
  196. os.lchmod(path, item['mode'])
  197. elif not symlink:
  198. os.chmod(path, item['mode'])
  199. uid = user2uid(item['user']) or item['uid']
  200. gid = group2gid(item['group']) or item['gid']
  201. try:
  202. os.lchown(path, uid, gid)
  203. except OSError:
  204. pass
  205. if not symlink:
  206. # FIXME: We should really call futimes here (c extension required)
  207. os.utime(path, (item['mtime'], item['mtime']))
  208. def verify_file(self, item, start, result):
  209. def verify_chunk(chunk, error, (id, i, last)):
  210. if i == 0:
  211. start(item)
  212. assert not error
  213. data, hash = self.key.decrypt(chunk)
  214. if self.key.id_hash(data) != id:
  215. result(item, False)
  216. elif last:
  217. result(item, True)
  218. n = len(item['chunks'])
  219. if n == 0:
  220. start(item)
  221. result(item, True)
  222. else:
  223. for i, (id, size, csize) in enumerate(item['chunks']):
  224. self.store.get(NS_CHUNK, id, callback=verify_chunk, callback_data=(id, i, i==n-1))
  225. def delete(self, cache):
  226. def callback(chunk, error, id):
  227. assert not error
  228. data, items_hash = self.key.decrypt(chunk)
  229. assert self.key.id_hash(data) == id
  230. unpacker.feed(data)
  231. for item in unpacker:
  232. try:
  233. for chunk_id, size, csize in item['chunks']:
  234. self.cache.chunk_decref(chunk_id)
  235. except KeyError:
  236. pass
  237. self.cache.chunk_decref(id)
  238. unpacker = msgpack.Unpacker()
  239. for id, size, csize in self.metadata['items']:
  240. if self.cache.seen_chunk(id) == 1:
  241. self.store.get(NS_CHUNK, id, callback=callback, callback_data=id)
  242. else:
  243. self.cache.chunk_decref(id)
  244. self.store.flush_rpc()
  245. self.store.delete(NS_ARCHIVE_METADATA, self.id)
  246. self.store.commit()
  247. cache.commit()
  248. def stat_attrs(self, st, path):
  249. item = {
  250. 'mode': st.st_mode,
  251. 'uid': st.st_uid, 'user': uid2user(st.st_uid),
  252. 'gid': st.st_gid, 'group': gid2group(st.st_gid),
  253. 'mtime': st.st_mtime,
  254. }
  255. try:
  256. xa = xattr(path, XATTR_NOFOLLOW)
  257. xattrs = {}
  258. for key in xa:
  259. # Only store the user namespace on Linux
  260. if linux and not key.startswith('user'):
  261. continue
  262. xattrs[key] = xa[key]
  263. if xattrs:
  264. item['xattrs'] = xattrs
  265. except IOError:
  266. pass
  267. return item
  268. def process_dir(self, path, st):
  269. item = {'path': path.lstrip('/\\:')}
  270. item.update(self.stat_attrs(st, path))
  271. self.add_item(item)
  272. def process_fifo(self, path, st):
  273. item = {'path': path.lstrip('/\\:')}
  274. item.update(self.stat_attrs(st, path))
  275. self.add_item(item)
  276. def process_symlink(self, path, st):
  277. source = os.readlink(path)
  278. item = {'path': path.lstrip('/\\:'), 'source': source}
  279. item.update(self.stat_attrs(st, path))
  280. self.add_item(item)
  281. def process_file(self, path, st, cache):
  282. safe_path = path.lstrip('/\\:')
  283. # Is it a hard link?
  284. if st.st_nlink > 1:
  285. source = self.hard_links.get((st.st_ino, st.st_dev))
  286. if (st.st_ino, st.st_dev) in self.hard_links:
  287. item = self.stat_attrs(st, path)
  288. item.update({'path': safe_path, 'source': source})
  289. self.add_item(item)
  290. return
  291. else:
  292. self.hard_links[st.st_ino, st.st_dev] = safe_path
  293. path_hash = self.key.id_hash(path.encode('utf-8'))
  294. ids = cache.file_known_and_unchanged(path_hash, st)
  295. chunks = None
  296. if ids is not None:
  297. # Make sure all ids are available
  298. for id in ids:
  299. if not cache.seen_chunk(id):
  300. break
  301. else:
  302. chunks = [cache.chunk_incref(id) for id in ids]
  303. # Only chunkify the file if needed
  304. if chunks is None:
  305. with open(path, 'rb') as fd:
  306. chunks = []
  307. for chunk in chunkify(fd, CHUNK_SIZE, WINDOW_SIZE,
  308. self.key.chunk_seed):
  309. chunks.append(cache.add_chunk(self.key.id_hash(chunk), chunk))
  310. ids = [id for id, _, _ in chunks]
  311. cache.memorize_file(path_hash, st, ids)
  312. item = {'path': safe_path, 'chunks': chunks}
  313. item.update(self.stat_attrs(st, path))
  314. self.add_item(item, ids)
  315. @staticmethod
  316. def list_archives(store, key):
  317. for id in list(store.list(NS_ARCHIVE_METADATA)):
  318. archive = Archive(store, key)
  319. archive.load(id)
  320. yield archive