2
0

archive.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338
  1. from __future__ import with_statement
  2. from datetime import datetime, timedelta
  3. from getpass import getuser
  4. import msgpack
  5. import os
  6. import socket
  7. import stat
  8. import sys
  9. from cStringIO import StringIO
  10. from xattr import xattr, XATTR_NOFOLLOW
  11. from . import NS_ARCHIVE_METADATA, NS_CHUNK
  12. from ._speedups import chunkify
  13. from .helpers import uid2user, user2uid, gid2group, group2gid, IntegrityError, Counter
  14. CHUNK_SIZE = 64 * 1024
  15. WINDOW_SIZE = 4096
  16. have_lchmod = hasattr(os, 'lchmod')
  17. linux = sys.platform == 'linux2'
  18. class Archive(object):
  19. class DoesNotExist(Exception):
  20. pass
  21. def __init__(self, store, key, name=None, cache=None):
  22. self.key = key
  23. self.store = store
  24. self.cache = cache
  25. self.items = StringIO()
  26. self.items_ids = []
  27. self.hard_links = {}
  28. if name:
  29. self.load(self.key.archive_hash(name))
  30. def load(self, id):
  31. self.id = id
  32. try:
  33. data, self.hash = self.key.decrypt(self.store.get(NS_ARCHIVE_METADATA, self.id))
  34. except self.store.DoesNotExist:
  35. raise self.DoesNotExist
  36. self.metadata = msgpack.unpackb(data)
  37. assert self.metadata['version'] == 1
  38. @property
  39. def ts(self):
  40. """Timestamp of archive creation in UTC"""
  41. t, f = self.metadata['time'].split('.', 1)
  42. return datetime.strptime(t, '%Y-%m-%dT%H:%M:%S') + timedelta(seconds=float('.' + f))
  43. def iter_items(self, callback):
  44. unpacker = msgpack.Unpacker()
  45. counter = Counter(0)
  46. def cb(chunk, error, id):
  47. assert not error
  48. counter.dec()
  49. data, items_hash = self.key.decrypt(chunk)
  50. assert self.key.id_hash(data) == id
  51. unpacker.feed(data)
  52. for item in unpacker:
  53. callback(item)
  54. for id, size, csize in self.metadata['items']:
  55. # Limit the number of concurrent items requests to 10
  56. self.store.flush_rpc(counter, 10)
  57. counter.inc()
  58. self.store.get(NS_CHUNK, id, callback=cb, callback_data=id)
  59. def add_item(self, item):
  60. self.items.write(msgpack.packb(item))
  61. if self.items.tell() > 1024 * 1024:
  62. self.flush_items()
  63. def flush_items(self, flush=False):
  64. if self.items.tell() == 0:
  65. return
  66. self.items.seek(0)
  67. chunks = list(str(s) for s in chunkify(self.items, CHUNK_SIZE, WINDOW_SIZE, self.key.chunk_seed))
  68. self.items.seek(0)
  69. self.items.truncate()
  70. for chunk in chunks[:-1]:
  71. self.items_ids.append(self.cache.add_chunk(self.key.id_hash(chunk), chunk))
  72. if flush or len(chunks) == 1:
  73. self.items_ids.append(self.cache.add_chunk(self.key.id_hash(chunks[-1]), chunks[-1]))
  74. else:
  75. self.items.write(chunks[-1])
  76. def save(self, name, cache):
  77. self.id = self.key.archive_hash(name)
  78. self.flush_items(flush=True)
  79. metadata = {
  80. 'version': 1,
  81. 'name': name,
  82. 'items': self.items_ids,
  83. 'cmdline': sys.argv,
  84. 'hostname': socket.gethostname(),
  85. 'username': getuser(),
  86. 'time': datetime.utcnow().isoformat(),
  87. }
  88. data, self.hash = self.key.encrypt(msgpack.packb(metadata))
  89. self.store.put(NS_ARCHIVE_METADATA, self.id, data)
  90. self.store.commit()
  91. cache.commit()
  92. def stats(self, cache):
  93. # This function is a bit evil since it abuses the cache to calculate
  94. # the stats. The cache transaction must be rolled back afterwards
  95. def cb(chunk, error, id):
  96. assert not error
  97. data, items_hash = self.key.decrypt(chunk)
  98. assert self.key.id_hash(data) == id
  99. unpacker.feed(data)
  100. for item in unpacker:
  101. try:
  102. for id, size, csize in item['chunks']:
  103. count, _, _ = self.cache.chunks[id]
  104. stats['osize'] += size
  105. stats['csize'] += csize
  106. if count == 1:
  107. stats['usize'] += csize
  108. self.cache.chunks[id] = count - 1, size, csize
  109. except KeyError:
  110. pass
  111. unpacker = msgpack.Unpacker()
  112. cache.begin_txn()
  113. stats = {'osize': 0, 'csize': 0, 'usize': 0}
  114. for id, size, csize in self.metadata['items']:
  115. stats['osize'] += size
  116. stats['csize'] += csize
  117. if self.cache.seen_chunk(id) == 1:
  118. stats['usize'] += csize
  119. self.store.get(NS_CHUNK, id, callback=cb, callback_data=id)
  120. self.cache.chunk_decref(id)
  121. self.store.flush_rpc()
  122. cache.rollback()
  123. return stats
  124. def extract_item(self, item, dest=None, start_cb=None):
  125. dest = dest or os.getcwdu()
  126. dir_stat_queue = []
  127. assert item['path'][0] not in ('/', '\\', ':')
  128. path = os.path.join(dest, item['path'].decode('utf-8'))
  129. mode = item['mode']
  130. if stat.S_ISDIR(mode):
  131. if not os.path.exists(path):
  132. os.makedirs(path)
  133. self.restore_attrs(path, item)
  134. elif stat.S_ISFIFO(mode):
  135. if not os.path.exists(os.path.dirname(path)):
  136. os.makedirs(os.path.dirname(path))
  137. os.mkfifo(path)
  138. self.restore_attrs(path, item)
  139. elif stat.S_ISLNK(mode):
  140. if not os.path.exists(os.path.dirname(path)):
  141. os.makedirs(os.path.dirname(path))
  142. source = item['source']
  143. if os.path.exists(path):
  144. os.unlink(path)
  145. os.symlink(source, path)
  146. self.restore_attrs(path, item, symlink=True)
  147. elif stat.S_ISREG(mode):
  148. if not os.path.exists(os.path.dirname(path)):
  149. os.makedirs(os.path.dirname(path))
  150. # Hard link?
  151. if 'source' in item:
  152. source = os.path.join(dest, item['source'].decode('utf-8'))
  153. if os.path.exists(path):
  154. os.unlink(path)
  155. os.link(source, path)
  156. else:
  157. def extract_cb(chunk, error, (id, i)):
  158. if i == 0:
  159. state['fd'] = open(path, 'wb')
  160. start_cb(item)
  161. assert not error
  162. data, hash = self.key.decrypt(chunk)
  163. if self.key.id_hash(data) != id:
  164. raise IntegrityError('chunk hash did not match')
  165. state['fd'].write(data)
  166. if i == n - 1:
  167. state['fd'].close()
  168. self.restore_attrs(path, item)
  169. state = {}
  170. n = len(item['chunks'])
  171. ## 0 chunks indicates an empty (0 bytes) file
  172. if n == 0:
  173. open(path, 'wb').close()
  174. start_cb(item)
  175. self.restore_attrs(path, item)
  176. else:
  177. for i, (id, size, csize) in enumerate(item['chunks']):
  178. self.store.get(NS_CHUNK, id, callback=extract_cb, callback_data=(id, i))
  179. else:
  180. raise Exception('Unknown archive item type %r' % item['mode'])
  181. def restore_attrs(self, path, item, symlink=False):
  182. xattrs = item.get('xattrs')
  183. if xattrs:
  184. xa = xattr(path, XATTR_NOFOLLOW)
  185. for k, v in xattrs.items():
  186. try:
  187. xa.set(k, v)
  188. except KeyError:
  189. pass
  190. if have_lchmod:
  191. os.lchmod(path, item['mode'])
  192. elif not symlink:
  193. os.chmod(path, item['mode'])
  194. uid = user2uid(item['user']) or item['uid']
  195. gid = group2gid(item['group']) or item['gid']
  196. try:
  197. os.lchown(path, uid, gid)
  198. except OSError:
  199. pass
  200. if not symlink:
  201. # FIXME: We should really call futimes here (c extension required)
  202. os.utime(path, (item['mtime'], item['mtime']))
  203. def verify_file(self, item, start, result):
  204. def verify_chunk(chunk, error, (id, i, last)):
  205. if i == 0:
  206. start(item)
  207. assert not error
  208. data, hash = self.key.decrypt(chunk)
  209. if self.key.id_hash(data) != id:
  210. result(item, False)
  211. elif last:
  212. result(item, True)
  213. n = len(item['chunks'])
  214. if n == 0:
  215. start(item)
  216. result(item, True)
  217. else:
  218. for i, (id, size, csize) in enumerate(item['chunks']):
  219. self.store.get(NS_CHUNK, id, callback=verify_chunk, callback_data=(id, i, i==n-1))
  220. def delete(self, cache):
  221. def callback(chunk, error, id):
  222. assert not error
  223. data, items_hash = self.key.decrypt(chunk)
  224. assert self.key.id_hash(data) == id
  225. unpacker.feed(data)
  226. for item in unpacker:
  227. try:
  228. for chunk_id, size, csize in item['chunks']:
  229. self.cache.chunk_decref(chunk_id)
  230. except KeyError:
  231. pass
  232. self.cache.chunk_decref(id)
  233. unpacker = msgpack.Unpacker()
  234. for id, size, csize in self.metadata['items']:
  235. self.store.get(NS_CHUNK, id, callback=callback, callback_data=id)
  236. self.store.flush_rpc()
  237. self.store.delete(NS_ARCHIVE_METADATA, self.id)
  238. self.store.commit()
  239. cache.commit()
  240. def stat_attrs(self, st, path):
  241. item = {
  242. 'mode': st.st_mode,
  243. 'uid': st.st_uid, 'user': uid2user(st.st_uid),
  244. 'gid': st.st_gid, 'group': gid2group(st.st_gid),
  245. 'mtime': st.st_mtime,
  246. }
  247. try:
  248. xa = xattr(path, XATTR_NOFOLLOW)
  249. xattrs = {}
  250. for key in xa:
  251. # Only store the user namespace on Linux
  252. if linux and not key.startswith('user'):
  253. continue
  254. xattrs[key] = xa[key]
  255. if xattrs:
  256. item['xattrs'] = xattrs
  257. except IOError:
  258. pass
  259. return item
  260. def process_dir(self, path, st):
  261. item = {'path': path.lstrip('/\\:')}
  262. item.update(self.stat_attrs(st, path))
  263. self.add_item(item)
  264. def process_fifo(self, path, st):
  265. item = {'path': path.lstrip('/\\:')}
  266. item.update(self.stat_attrs(st, path))
  267. self.add_item(item)
  268. def process_symlink(self, path, st):
  269. source = os.readlink(path)
  270. item = {'path': path.lstrip('/\\:'), 'source': source}
  271. item.update(self.stat_attrs(st, path))
  272. self.add_item(item)
  273. def process_file(self, path, st, cache):
  274. safe_path = path.lstrip('/\\:')
  275. # Is it a hard link?
  276. if st.st_nlink > 1:
  277. source = self.hard_links.get((st.st_ino, st.st_dev))
  278. if (st.st_ino, st.st_dev) in self.hard_links:
  279. item = self.stat_attrs(st, path)
  280. item.update({'path': safe_path, 'source': source})
  281. self.add_item(item)
  282. return
  283. else:
  284. self.hard_links[st.st_ino, st.st_dev] = safe_path
  285. path_hash = self.key.id_hash(path.encode('utf-8'))
  286. ids = cache.file_known_and_unchanged(path_hash, st)
  287. chunks = None
  288. if ids is not None:
  289. # Make sure all ids are available
  290. for id in ids:
  291. if not cache.seen_chunk(id):
  292. break
  293. else:
  294. chunks = [cache.chunk_incref(id) for id in ids]
  295. # Only chunkify the file if needed
  296. if chunks is None:
  297. with open(path, 'rb') as fd:
  298. chunks = []
  299. for chunk in chunkify(fd, CHUNK_SIZE, WINDOW_SIZE,
  300. self.key.chunk_seed):
  301. chunks.append(cache.add_chunk(self.key.id_hash(chunk), chunk))
  302. ids = [id for id, _, _ in chunks]
  303. cache.memorize_file(path_hash, st, ids)
  304. item = {'path': safe_path, 'chunks': chunks}
  305. item.update(self.stat_attrs(st, path))
  306. self.add_item(item)
  307. @staticmethod
  308. def list_archives(store, key):
  309. for id in list(store.list(NS_ARCHIVE_METADATA)):
  310. archive = Archive(store, key)
  311. archive.load(id)
  312. yield archive