archive.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380
  1. from __future__ import with_statement
  2. from datetime import datetime, timedelta
  3. from getpass import getuser
  4. import msgpack
  5. import os
  6. import socket
  7. import stat
  8. import sys
  9. from cStringIO import StringIO
  10. from xattr import xattr, XATTR_NOFOLLOW
  11. from ._speedups import chunkify
  12. from .helpers import uid2user, user2uid, gid2group, group2gid, \
  13. Counter, encode_filename, Statistics
  14. ITEMS_BUFFER = 1024 * 1024
  15. CHUNK_SIZE = 64 * 1024
  16. WINDOW_SIZE = 4096
  17. have_lchmod = hasattr(os, 'lchmod')
  18. linux = sys.platform == 'linux2'
  19. class Archive(object):
  20. class DoesNotExist(Exception):
  21. pass
  22. def __init__(self, store, key, name=None, cache=None):
  23. self.key = key
  24. self.store = store
  25. self.cache = cache
  26. self.items = StringIO()
  27. self.items_ids = []
  28. self.hard_links = {}
  29. self.stats = Statistics()
  30. if name:
  31. manifest = Archive.read_manifest(self.store, self.key)
  32. try:
  33. info = manifest['archives'][name]
  34. except KeyError:
  35. raise Archive.DoesNotExist
  36. self.load(info['id'])
  37. @staticmethod
  38. def read_manifest(store, key):
  39. mid = store.meta['manifest']
  40. if not mid:
  41. return {'version': 1, 'archives': {}}
  42. mid = mid.decode('hex')
  43. data = key.decrypt(mid, store.get(mid))
  44. manifest = msgpack.unpackb(data)
  45. if not manifest.get('version') == 1:
  46. raise ValueError('Invalid manifest version')
  47. return manifest
  48. def write_manifest(self, manifest):
  49. mid = self.store.meta['manifest']
  50. if mid:
  51. self.cache.chunk_decref(mid.decode('hex'))
  52. if manifest:
  53. data = msgpack.packb(manifest)
  54. mid = self.key.id_hash(data)
  55. self.cache.add_chunk(mid, data, self.stats)
  56. self.store.meta['manifest'] = mid.encode('hex')
  57. else:
  58. self.store.meta['manifest'] = ''
  59. def load(self, id):
  60. self.id = id
  61. data = self.key.decrypt(self.id, self.store.get(self.id))
  62. self.metadata = msgpack.unpackb(data)
  63. if self.metadata['version'] != 1:
  64. raise Exception('Unknown archive metadata version')
  65. self.name = self.metadata['name']
  66. @property
  67. def ts(self):
  68. """Timestamp of archive creation in UTC"""
  69. t, f = self.metadata['time'].split('.', 1)
  70. return datetime.strptime(t, '%Y-%m-%dT%H:%M:%S') + timedelta(seconds=float('.' + f))
  71. def __repr__(self):
  72. return 'Archive(%r)' % self.name
  73. def iter_items(self, callback):
  74. unpacker = msgpack.Unpacker()
  75. counter = Counter(0)
  76. def cb(chunk, error, id):
  77. if error:
  78. raise error
  79. assert not error
  80. counter.dec()
  81. data = self.key.decrypt(id, chunk)
  82. unpacker.feed(data)
  83. for item in unpacker:
  84. callback(item)
  85. for id in self.metadata['items']:
  86. # Limit the number of concurrent items requests to 10
  87. self.store.flush_rpc(counter, 10)
  88. counter.inc()
  89. self.store.get(id, callback=cb, callback_data=id)
  90. def add_item(self, item):
  91. self.items.write(msgpack.packb(item))
  92. if self.items.tell() > ITEMS_BUFFER:
  93. self.flush_items()
  94. def flush_items(self, flush=False):
  95. if self.items.tell() == 0:
  96. return
  97. self.items.seek(0)
  98. chunks = list(str(s) for s in chunkify(self.items, CHUNK_SIZE, WINDOW_SIZE, self.key.chunk_seed))
  99. self.items.seek(0)
  100. self.items.truncate()
  101. for chunk in chunks[:-1]:
  102. id, _, _ = self.cache.add_chunk(self.key.id_hash(chunk), chunk, self.stats)
  103. self.items_ids.append(id)
  104. if flush or len(chunks) == 1:
  105. id, _, _ = self.cache.add_chunk(self.key.id_hash(chunks[-1]), chunks[-1], self.stats)
  106. self.items_ids.append(id)
  107. else:
  108. self.items.write(chunks[-1])
  109. def save(self, name, cache):
  110. self.flush_items(flush=True)
  111. metadata = {
  112. 'version': 1,
  113. 'name': name,
  114. 'items': self.items_ids,
  115. 'cmdline': sys.argv,
  116. 'hostname': socket.gethostname(),
  117. 'username': getuser(),
  118. 'time': datetime.utcnow().isoformat(),
  119. }
  120. data = msgpack.packb(metadata)
  121. self.id = self.key.id_hash(data)
  122. cache.add_chunk(self.id, data, self.stats)
  123. manifest = Archive.read_manifest(self.store, self.key)
  124. assert not name in manifest['archives']
  125. manifest['archives'][name] = {'id': self.id, 'time': metadata['time']}
  126. self.write_manifest(manifest)
  127. self.store.commit()
  128. cache.commit()
  129. def calc_stats(self, cache):
  130. # This function is a bit evil since it abuses the cache to calculate
  131. # the stats. The cache transaction must be rolled back afterwards
  132. def cb(chunk, error, id):
  133. assert not error
  134. data = self.key.decrypt(id, chunk)
  135. unpacker.feed(data)
  136. for item in unpacker:
  137. try:
  138. for id, size, csize in item['chunks']:
  139. count, _, _ = self.cache.chunks[id]
  140. stats.update(size, csize, count==1)
  141. stats.nfiles += 1
  142. self.cache.chunks[id] = count - 1, size, csize
  143. except KeyError:
  144. pass
  145. unpacker = msgpack.Unpacker()
  146. cache.begin_txn()
  147. stats = Statistics()
  148. for id in self.metadata['items']:
  149. self.store.get(id, callback=cb, callback_data=id)
  150. count, size, csize = self.cache.chunks[id]
  151. stats.update(size, csize, count==1)
  152. self.cache.chunks[id] = count - 1, size, csize
  153. self.store.flush_rpc()
  154. cache.rollback()
  155. return stats
  156. def extract_item(self, item, dest=None, start_cb=None, restore_attrs=True):
  157. dest = dest or os.getcwdu()
  158. dir_stat_queue = []
  159. assert item['path'][0] not in ('/', '\\', ':')
  160. path = os.path.join(dest, encode_filename(item['path']))
  161. mode = item['mode']
  162. if stat.S_ISDIR(mode):
  163. if not os.path.exists(path):
  164. os.makedirs(path)
  165. if restore_attrs:
  166. self.restore_attrs(path, item)
  167. elif stat.S_ISFIFO(mode):
  168. if not os.path.exists(os.path.dirname(path)):
  169. os.makedirs(os.path.dirname(path))
  170. os.mkfifo(path)
  171. self.restore_attrs(path, item)
  172. elif stat.S_ISLNK(mode):
  173. if not os.path.exists(os.path.dirname(path)):
  174. os.makedirs(os.path.dirname(path))
  175. source = item['source']
  176. if os.path.exists(path):
  177. os.unlink(path)
  178. os.symlink(source, path)
  179. self.restore_attrs(path, item, symlink=True)
  180. elif stat.S_ISREG(mode):
  181. if not os.path.exists(os.path.dirname(path)):
  182. os.makedirs(os.path.dirname(path))
  183. # Hard link?
  184. if 'source' in item:
  185. source = os.path.join(dest, item['source'])
  186. if os.path.exists(path):
  187. os.unlink(path)
  188. os.link(source, path)
  189. else:
  190. def extract_cb(chunk, error, (id, i)):
  191. if i == 0:
  192. state['fd'] = open(path, 'wb')
  193. start_cb(item)
  194. assert not error
  195. data = self.key.decrypt(id, chunk)
  196. state['fd'].write(data)
  197. if i == n - 1:
  198. state['fd'].close()
  199. self.restore_attrs(path, item)
  200. state = {}
  201. n = len(item['chunks'])
  202. ## 0 chunks indicates an empty (0 bytes) file
  203. if n == 0:
  204. open(path, 'wb').close()
  205. start_cb(item)
  206. self.restore_attrs(path, item)
  207. else:
  208. for i, (id, size, csize) in enumerate(item['chunks']):
  209. self.store.get(id, callback=extract_cb, callback_data=(id, i))
  210. else:
  211. raise Exception('Unknown archive item type %r' % item['mode'])
  212. def restore_attrs(self, path, item, symlink=False):
  213. xattrs = item.get('xattrs')
  214. if xattrs:
  215. xa = xattr(path, XATTR_NOFOLLOW)
  216. for k, v in xattrs.items():
  217. try:
  218. xa.set(k, v)
  219. except (IOError, KeyError):
  220. pass
  221. if have_lchmod:
  222. os.lchmod(path, item['mode'])
  223. elif not symlink:
  224. os.chmod(path, item['mode'])
  225. uid = user2uid(item['user']) or item['uid']
  226. gid = group2gid(item['group']) or item['gid']
  227. try:
  228. os.lchown(path, uid, gid)
  229. except OSError:
  230. pass
  231. if not symlink:
  232. # FIXME: We should really call futimes here (c extension required)
  233. os.utime(path, (item['mtime'], item['mtime']))
  234. def verify_file(self, item, start, result):
  235. def verify_chunk(chunk, error, (id, i)):
  236. if error:
  237. if not state:
  238. result(item, False)
  239. state[True] = True
  240. return
  241. if i == 0:
  242. start(item)
  243. data = self.key.decrypt(id, chunk)
  244. if i == n - 1:
  245. result(item, True)
  246. state = {}
  247. n = len(item['chunks'])
  248. if n == 0:
  249. start(item)
  250. result(item, True)
  251. else:
  252. for i, (id, size, csize) in enumerate(item['chunks']):
  253. self.store.get(id, callback=verify_chunk, callback_data=(id, i))
  254. def delete(self, cache):
  255. def callback(chunk, error, id):
  256. assert not error
  257. data = self.key.decrypt(id, chunk)
  258. unpacker.feed(data)
  259. for item in unpacker:
  260. try:
  261. for chunk_id, size, csize in item['chunks']:
  262. self.cache.chunk_decref(chunk_id)
  263. except KeyError:
  264. pass
  265. self.cache.chunk_decref(id)
  266. unpacker = msgpack.Unpacker()
  267. for id in self.metadata['items']:
  268. self.store.get(id, callback=callback, callback_data=id)
  269. self.store.flush_rpc()
  270. self.cache.chunk_decref(self.id)
  271. manifest = Archive.read_manifest(self.store, self.key)
  272. assert self.name in manifest['archives']
  273. del manifest['archives'][self.name]
  274. self.write_manifest(manifest)
  275. self.store.commit()
  276. cache.commit()
  277. def stat_attrs(self, st, path):
  278. item = {
  279. 'mode': st.st_mode,
  280. 'uid': st.st_uid, 'user': uid2user(st.st_uid),
  281. 'gid': st.st_gid, 'group': gid2group(st.st_gid),
  282. 'mtime': st.st_mtime,
  283. }
  284. try:
  285. xa = xattr(path, XATTR_NOFOLLOW)
  286. xattrs = {}
  287. for key in xa:
  288. # Only store the user namespace on Linux
  289. if linux and not key.startswith('user'):
  290. continue
  291. xattrs[key] = xa[key]
  292. if xattrs:
  293. item['xattrs'] = xattrs
  294. except IOError:
  295. pass
  296. return item
  297. def process_dir(self, path, st):
  298. item = {'path': path.lstrip('/\\:')}
  299. item.update(self.stat_attrs(st, path))
  300. self.add_item(item)
  301. def process_fifo(self, path, st):
  302. item = {'path': path.lstrip('/\\:')}
  303. item.update(self.stat_attrs(st, path))
  304. self.add_item(item)
  305. def process_symlink(self, path, st):
  306. source = os.readlink(path)
  307. item = {'path': path.lstrip('/\\:'), 'source': source}
  308. item.update(self.stat_attrs(st, path))
  309. self.add_item(item)
  310. def process_file(self, path, st, cache):
  311. safe_path = path.lstrip('/\\:')
  312. # Is it a hard link?
  313. if st.st_nlink > 1:
  314. source = self.hard_links.get((st.st_ino, st.st_dev))
  315. if (st.st_ino, st.st_dev) in self.hard_links:
  316. item = self.stat_attrs(st, path)
  317. item.update({'path': safe_path, 'source': source})
  318. self.add_item(item)
  319. return
  320. else:
  321. self.hard_links[st.st_ino, st.st_dev] = safe_path
  322. path_hash = self.key.id_hash(path)
  323. ids = cache.file_known_and_unchanged(path_hash, st)
  324. chunks = None
  325. if ids is not None:
  326. # Make sure all ids are available
  327. for id in ids:
  328. if not cache.seen_chunk(id):
  329. break
  330. else:
  331. chunks = [cache.chunk_incref(id, self.stats) for id in ids]
  332. # Only chunkify the file if needed
  333. if chunks is None:
  334. with open(path, 'rb') as fd:
  335. chunks = []
  336. for chunk in chunkify(fd, CHUNK_SIZE, WINDOW_SIZE,
  337. self.key.chunk_seed):
  338. chunks.append(cache.add_chunk(self.key.id_hash(chunk), chunk, self.stats))
  339. ids = [id for id, _, _ in chunks]
  340. cache.memorize_file(path_hash, st, ids)
  341. item = {'path': safe_path, 'chunks': chunks}
  342. item.update(self.stat_attrs(st, path))
  343. self.stats.nfiles += 1
  344. self.add_item(item)
  345. @staticmethod
  346. def list_archives(store, key, cache=None):
  347. manifest = Archive.read_manifest(store, key)
  348. for name, info in manifest['archives'].items():
  349. archive = Archive(store, key, cache=cache)
  350. archive.load(info['id'])
  351. yield archive