archive.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349
  1. from __future__ import with_statement
  2. from datetime import datetime, timedelta
  3. from getpass import getuser
  4. import msgpack
  5. import os
  6. import socket
  7. import stat
  8. import sys
  9. from cStringIO import StringIO
  10. from xattr import xattr, XATTR_NOFOLLOW
  11. from . import NS_ARCHIVE_METADATA, NS_CHUNK
  12. from ._speedups import chunkify
  13. from .helpers import uid2user, user2uid, gid2group, group2gid, IntegrityError, \
  14. Counter, encode_filename, Statistics
  15. ITEMS_BUFFER = 1024 * 1024
  16. CHUNK_SIZE = 64 * 1024
  17. WINDOW_SIZE = 4096
  18. have_lchmod = hasattr(os, 'lchmod')
  19. linux = sys.platform == 'linux2'
  20. class Archive(object):
  21. class DoesNotExist(Exception):
  22. pass
  23. def __init__(self, store, key, name=None, cache=None):
  24. self.key = key
  25. self.store = store
  26. self.cache = cache
  27. self.items = StringIO()
  28. self.items_ids = []
  29. self.hard_links = {}
  30. self.stats = Statistics()
  31. if name:
  32. self.load(self.key.archive_hash(name))
  33. def load(self, id):
  34. self.id = id
  35. try:
  36. data, self.hash = self.key.decrypt(self.store.get(NS_ARCHIVE_METADATA, self.id))
  37. except self.store.DoesNotExist:
  38. raise self.DoesNotExist
  39. self.metadata = msgpack.unpackb(data)
  40. if self.metadata['version'] != 1:
  41. raise Exception('Unknown archive metadata version')
  42. self.name = self.metadata['name']
  43. @property
  44. def ts(self):
  45. """Timestamp of archive creation in UTC"""
  46. t, f = self.metadata['time'].split('.', 1)
  47. return datetime.strptime(t, '%Y-%m-%dT%H:%M:%S') + timedelta(seconds=float('.' + f))
  48. def __repr__(self):
  49. return 'Archive(%r)' % self.name
  50. def iter_items(self, callback):
  51. unpacker = msgpack.Unpacker()
  52. counter = Counter(0)
  53. def cb(chunk, error, id):
  54. if error:
  55. raise error
  56. assert not error
  57. counter.dec()
  58. data, items_hash = self.key.decrypt(chunk)
  59. assert self.key.id_hash(data) == id
  60. unpacker.feed(data)
  61. for item in unpacker:
  62. callback(item)
  63. for id, size, csize in self.metadata['items']:
  64. # Limit the number of concurrent items requests to 10
  65. self.store.flush_rpc(counter, 10)
  66. counter.inc()
  67. self.store.get(NS_CHUNK, id, callback=cb, callback_data=id)
  68. def add_item(self, item):
  69. self.items.write(msgpack.packb(item))
  70. if self.items.tell() > ITEMS_BUFFER:
  71. self.flush_items()
  72. def flush_items(self, flush=False):
  73. if self.items.tell() == 0:
  74. return
  75. self.items.seek(0)
  76. chunks = list(str(s) for s in chunkify(self.items, CHUNK_SIZE, WINDOW_SIZE, self.key.chunk_seed))
  77. self.items.seek(0)
  78. self.items.truncate()
  79. for chunk in chunks[:-1]:
  80. self.items_ids.append(self.cache.add_chunk(self.key.id_hash(chunk),
  81. chunk, self.stats))
  82. if flush or len(chunks) == 1:
  83. self.items_ids.append(self.cache.add_chunk(self.key.id_hash(chunks[-1]),
  84. chunks[-1], self.stats))
  85. else:
  86. self.items.write(chunks[-1])
  87. def save(self, name, cache):
  88. self.id = self.key.archive_hash(name)
  89. self.flush_items(flush=True)
  90. metadata = {
  91. 'version': 1,
  92. 'name': name,
  93. 'items': self.items_ids,
  94. 'cmdline': sys.argv,
  95. 'hostname': socket.gethostname(),
  96. 'username': getuser(),
  97. 'time': datetime.utcnow().isoformat(),
  98. }
  99. data, self.hash = self.key.encrypt(msgpack.packb(metadata))
  100. self.store.put(NS_ARCHIVE_METADATA, self.id, data)
  101. self.store.commit()
  102. cache.commit()
  103. def calc_stats(self, cache):
  104. # This function is a bit evil since it abuses the cache to calculate
  105. # the stats. The cache transaction must be rolled back afterwards
  106. def cb(chunk, error, id):
  107. assert not error
  108. data, items_hash = self.key.decrypt(chunk)
  109. assert self.key.id_hash(data) == id
  110. unpacker.feed(data)
  111. for item in unpacker:
  112. try:
  113. for id, size, csize in item['chunks']:
  114. count, _, _ = self.cache.chunks[id]
  115. stats.update(size, csize, count==1)
  116. stats.nfiles += 1
  117. self.cache.chunks[id] = count - 1, size, csize
  118. except KeyError:
  119. pass
  120. unpacker = msgpack.Unpacker()
  121. cache.begin_txn()
  122. stats = Statistics()
  123. for id, size, csize in self.metadata['items']:
  124. stats.update(size, csize, self.cache.seen_chunk(id) == 1)
  125. self.store.get(NS_CHUNK, id, callback=cb, callback_data=id)
  126. self.cache.chunk_decref(id)
  127. self.store.flush_rpc()
  128. cache.rollback()
  129. return stats
  130. def extract_item(self, item, dest=None, start_cb=None, restore_attrs=True):
  131. dest = dest or os.getcwdu()
  132. dir_stat_queue = []
  133. assert item['path'][0] not in ('/', '\\', ':')
  134. path = os.path.join(dest, encode_filename(item['path']))
  135. mode = item['mode']
  136. if stat.S_ISDIR(mode):
  137. if not os.path.exists(path):
  138. os.makedirs(path)
  139. if restore_attrs:
  140. self.restore_attrs(path, item)
  141. elif stat.S_ISFIFO(mode):
  142. if not os.path.exists(os.path.dirname(path)):
  143. os.makedirs(os.path.dirname(path))
  144. os.mkfifo(path)
  145. self.restore_attrs(path, item)
  146. elif stat.S_ISLNK(mode):
  147. if not os.path.exists(os.path.dirname(path)):
  148. os.makedirs(os.path.dirname(path))
  149. source = item['source']
  150. if os.path.exists(path):
  151. os.unlink(path)
  152. os.symlink(source, path)
  153. self.restore_attrs(path, item, symlink=True)
  154. elif stat.S_ISREG(mode):
  155. if not os.path.exists(os.path.dirname(path)):
  156. os.makedirs(os.path.dirname(path))
  157. # Hard link?
  158. if 'source' in item:
  159. source = os.path.join(dest, item['source'])
  160. if os.path.exists(path):
  161. os.unlink(path)
  162. os.link(source, path)
  163. else:
  164. def extract_cb(chunk, error, (id, i)):
  165. if i == 0:
  166. state['fd'] = open(path, 'wb')
  167. start_cb(item)
  168. assert not error
  169. data, hash = self.key.decrypt(chunk)
  170. if self.key.id_hash(data) != id:
  171. raise IntegrityError('chunk hash did not match')
  172. state['fd'].write(data)
  173. if i == n - 1:
  174. state['fd'].close()
  175. self.restore_attrs(path, item)
  176. state = {}
  177. n = len(item['chunks'])
  178. ## 0 chunks indicates an empty (0 bytes) file
  179. if n == 0:
  180. open(path, 'wb').close()
  181. start_cb(item)
  182. self.restore_attrs(path, item)
  183. else:
  184. for i, (id, size, csize) in enumerate(item['chunks']):
  185. self.store.get(NS_CHUNK, id, callback=extract_cb, callback_data=(id, i))
  186. else:
  187. raise Exception('Unknown archive item type %r' % item['mode'])
  188. def restore_attrs(self, path, item, symlink=False):
  189. xattrs = item.get('xattrs')
  190. if xattrs:
  191. xa = xattr(path, XATTR_NOFOLLOW)
  192. for k, v in xattrs.items():
  193. try:
  194. xa.set(k, v)
  195. except (IOError, KeyError):
  196. pass
  197. if have_lchmod:
  198. os.lchmod(path, item['mode'])
  199. elif not symlink:
  200. os.chmod(path, item['mode'])
  201. uid = user2uid(item['user']) or item['uid']
  202. gid = group2gid(item['group']) or item['gid']
  203. try:
  204. os.lchown(path, uid, gid)
  205. except OSError:
  206. pass
  207. if not symlink:
  208. # FIXME: We should really call futimes here (c extension required)
  209. os.utime(path, (item['mtime'], item['mtime']))
  210. def verify_file(self, item, start, result):
  211. def verify_chunk(chunk, error, (id, i)):
  212. if error:
  213. raise error
  214. if i == 0:
  215. start(item)
  216. data, hash = self.key.decrypt(chunk)
  217. if self.key.id_hash(data) != id:
  218. result(item, False)
  219. elif i == n - 1:
  220. result(item, True)
  221. n = len(item['chunks'])
  222. if n == 0:
  223. start(item)
  224. result(item, True)
  225. else:
  226. for i, (id, size, csize) in enumerate(item['chunks']):
  227. self.store.get(NS_CHUNK, id, callback=verify_chunk, callback_data=(id, i))
  228. def delete(self, cache):
  229. def callback(chunk, error, id):
  230. assert not error
  231. data, items_hash = self.key.decrypt(chunk)
  232. if self.key.id_hash(data) != id:
  233. raise IntegrityError('Chunk checksum mismatch')
  234. unpacker.feed(data)
  235. for item in unpacker:
  236. try:
  237. for chunk_id, size, csize in item['chunks']:
  238. self.cache.chunk_decref(chunk_id)
  239. except KeyError:
  240. pass
  241. self.cache.chunk_decref(id)
  242. unpacker = msgpack.Unpacker()
  243. for id, size, csize in self.metadata['items']:
  244. self.store.get(NS_CHUNK, id, callback=callback, callback_data=id)
  245. self.store.flush_rpc()
  246. self.store.delete(NS_ARCHIVE_METADATA, self.id)
  247. self.store.commit()
  248. cache.commit()
  249. def stat_attrs(self, st, path):
  250. item = {
  251. 'mode': st.st_mode,
  252. 'uid': st.st_uid, 'user': uid2user(st.st_uid),
  253. 'gid': st.st_gid, 'group': gid2group(st.st_gid),
  254. 'mtime': st.st_mtime,
  255. }
  256. try:
  257. xa = xattr(path, XATTR_NOFOLLOW)
  258. xattrs = {}
  259. for key in xa:
  260. # Only store the user namespace on Linux
  261. if linux and not key.startswith('user'):
  262. continue
  263. xattrs[key] = xa[key]
  264. if xattrs:
  265. item['xattrs'] = xattrs
  266. except IOError:
  267. pass
  268. return item
  269. def process_dir(self, path, st):
  270. item = {'path': path.lstrip('/\\:')}
  271. item.update(self.stat_attrs(st, path))
  272. self.add_item(item)
  273. def process_fifo(self, path, st):
  274. item = {'path': path.lstrip('/\\:')}
  275. item.update(self.stat_attrs(st, path))
  276. self.add_item(item)
  277. def process_symlink(self, path, st):
  278. source = os.readlink(path)
  279. item = {'path': path.lstrip('/\\:'), 'source': source}
  280. item.update(self.stat_attrs(st, path))
  281. self.add_item(item)
  282. def process_file(self, path, st, cache):
  283. safe_path = path.lstrip('/\\:')
  284. # Is it a hard link?
  285. if st.st_nlink > 1:
  286. source = self.hard_links.get((st.st_ino, st.st_dev))
  287. if (st.st_ino, st.st_dev) in self.hard_links:
  288. item = self.stat_attrs(st, path)
  289. item.update({'path': safe_path, 'source': source})
  290. self.add_item(item)
  291. return
  292. else:
  293. self.hard_links[st.st_ino, st.st_dev] = safe_path
  294. path_hash = self.key.id_hash(path)
  295. ids = cache.file_known_and_unchanged(path_hash, st)
  296. chunks = None
  297. if ids is not None:
  298. # Make sure all ids are available
  299. for id in ids:
  300. if not cache.seen_chunk(id):
  301. break
  302. else:
  303. chunks = [cache.chunk_incref(id, self.stats) for id in ids]
  304. # Only chunkify the file if needed
  305. if chunks is None:
  306. with open(path, 'rb') as fd:
  307. chunks = []
  308. for chunk in chunkify(fd, CHUNK_SIZE, WINDOW_SIZE,
  309. self.key.chunk_seed):
  310. chunks.append(cache.add_chunk(self.key.id_hash(chunk), chunk, self.stats))
  311. ids = [id for id, _, _ in chunks]
  312. cache.memorize_file(path_hash, st, ids)
  313. item = {'path': safe_path, 'chunks': chunks}
  314. item.update(self.stat_attrs(st, path))
  315. self.stats.nfiles += 1
  316. self.add_item(item)
  317. @staticmethod
  318. def list_archives(store, key, cache=None):
  319. for id in list(store.list(NS_ARCHIVE_METADATA)):
  320. archive = Archive(store, key, cache=cache)
  321. archive.load(id)
  322. yield archive