archive.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335
  1. from __future__ import with_statement
  2. from datetime import datetime, timedelta
  3. from getpass import getuser
  4. import msgpack
  5. import os
  6. import socket
  7. import stat
  8. import sys
  9. from os.path import dirname
  10. from xattr import xattr, XATTR_NOFOLLOW
  11. from . import NS_ARCHIVE_METADATA, NS_CHUNK
  12. from ._speedups import chunkify
  13. from .helpers import uid2user, user2uid, gid2group, group2gid, IntegrityError
  14. CHUNK_SIZE = 64 * 1024
  15. WINDOW_SIZE = 4096
  16. have_lchmod = hasattr(os, 'lchmod')
  17. linux = sys.platform == 'linux2'
  18. class Archive(object):
  19. class DoesNotExist(Exception):
  20. pass
  21. def __init__(self, store, key, name=None, cache=None):
  22. self.key = key
  23. self.store = store
  24. self.cache = cache
  25. self.items = ''
  26. self.items_refs = []
  27. self.items_prefix = ''
  28. self.items_ids = []
  29. self.hard_links = {}
  30. if name:
  31. self.load(self.key.archive_hash(name))
  32. def load(self, id):
  33. self.id = id
  34. try:
  35. data, self.hash = self.key.decrypt(self.store.get(NS_ARCHIVE_METADATA, self.id))
  36. except self.store.DoesNotExist:
  37. raise self.DoesNotExist
  38. self.metadata = msgpack.unpackb(data)
  39. assert self.metadata['version'] == 1
  40. @property
  41. def ts(self):
  42. """Timestamp of archive creation in UTC"""
  43. t, f = self.metadata['time'].split('.', 1)
  44. return datetime.strptime(t, '%Y-%m-%dT%H:%M:%S') + timedelta(seconds=float('.' + f))
  45. def get_items(self):
  46. unpacker = msgpack.Unpacker()
  47. for id, size, csize in self.metadata['items']:
  48. data, items_hash = self.key.decrypt(self.store.get(NS_CHUNK, id))
  49. assert self.key.id_hash(data) == id
  50. unpacker.feed(data)
  51. for item in unpacker:
  52. yield item
  53. def add_item(self, item, refs=None):
  54. data = msgpack.packb(item)
  55. prefix = dirname(item['path'])
  56. if self.items_prefix and self.items_prefix != prefix:
  57. self.flush_items()
  58. if refs:
  59. self.items_refs += refs
  60. self.items += data
  61. self.items_prefix = prefix
  62. def flush_items(self):
  63. if not self.items:
  64. return
  65. id = self.key.id_hash(self.items)
  66. if self.cache.seen_chunk(id):
  67. self.items_ids.append(self.cache.chunk_incref(id))
  68. for id in self.items_refs:
  69. self.cache.chunk_decref(id)
  70. else:
  71. self.items_ids.append(self.cache.add_chunk(id, self.items))
  72. self.items = ''
  73. self.items_refs = []
  74. self.items_prefix = ''
  75. def save(self, name, cache):
  76. self.id = self.key.archive_hash(name)
  77. self.flush_items()
  78. metadata = {
  79. 'version': 1,
  80. 'name': name,
  81. 'items': self.items_ids,
  82. 'cmdline': sys.argv,
  83. 'hostname': socket.gethostname(),
  84. 'username': getuser(),
  85. 'time': datetime.utcnow().isoformat(),
  86. }
  87. data, self.hash = self.key.encrypt(msgpack.packb(metadata))
  88. self.store.put(NS_ARCHIVE_METADATA, self.id, data)
  89. self.store.commit()
  90. cache.commit()
  91. def get_chunks(self):
  92. for item in self.get_items():
  93. try:
  94. for chunk in item['chunks']:
  95. yield chunk
  96. except KeyError:
  97. pass
  98. def stats(self, cache):
  99. # This function is a bit evil since it abuses the cache to calculate
  100. # the stats. The cache transaction must be rolled back afterwards
  101. unpacker = msgpack.Unpacker()
  102. cache.begin_txn()
  103. osize = zsize = usize = 0
  104. for id, size, csize in self.metadata['items']:
  105. osize += size
  106. zsize += csize
  107. unique = self.cache.seen_chunk(id) == 1
  108. if unique:
  109. usize += csize
  110. data, items_hash = self.key.decrypt(self.store.get(NS_CHUNK, id))
  111. assert self.key.id_hash(data) == id
  112. unpacker.feed(data)
  113. for item in unpacker:
  114. try:
  115. for id, size, csize in item['chunks']:
  116. osize += size
  117. zsize += csize
  118. if unique and self.cache.seen_chunk(id) == 1:
  119. usize += csize
  120. except KeyError:
  121. pass
  122. cache.rollback()
  123. return osize, zsize, usize
  124. def extract_item(self, item, dest=None, start_cb=None):
  125. dest = dest or os.getcwdu()
  126. dir_stat_queue = []
  127. assert item['path'][0] not in ('/', '\\', ':')
  128. path = os.path.join(dest, item['path'].decode('utf-8'))
  129. mode = item['mode']
  130. if stat.S_ISDIR(mode):
  131. if not os.path.exists(path):
  132. os.makedirs(path)
  133. self.restore_attrs(path, item)
  134. elif stat.S_ISFIFO(mode):
  135. if not os.path.exists(os.path.dirname(path)):
  136. os.makedirs(os.path.dirname(path))
  137. os.mkfifo(path)
  138. self.restore_attrs(path, item)
  139. elif stat.S_ISLNK(mode):
  140. if not os.path.exists(os.path.dirname(path)):
  141. os.makedirs(os.path.dirname(path))
  142. source = item['source']
  143. if os.path.exists(path):
  144. os.unlink(path)
  145. os.symlink(source, path)
  146. self.restore_attrs(path, item, symlink=True)
  147. elif stat.S_ISREG(mode):
  148. if not os.path.exists(os.path.dirname(path)):
  149. os.makedirs(os.path.dirname(path))
  150. # Hard link?
  151. if 'source' in item:
  152. source = os.path.join(dest, item['source'].decode('utf-8'))
  153. if os.path.exists(path):
  154. os.unlink(path)
  155. os.link(source, path)
  156. else:
  157. def extract_cb(chunk, error, (id, i, last)):
  158. if i==0:
  159. start_cb(item)
  160. assert not error
  161. data, hash = self.key.decrypt(chunk)
  162. if self.key.id_hash(data) != id:
  163. raise IntegrityError('chunk hash did not match')
  164. fd.write(data)
  165. if last:
  166. fd.close()
  167. self.restore_attrs(path, item)
  168. fd = open(path, 'wb')
  169. n = len(item['chunks'])
  170. if n == 0:
  171. start_cb(item)
  172. self.restore_attrs(path, item)
  173. fd.close()
  174. else:
  175. for i, (id, size, csize) in enumerate(item['chunks']):
  176. self.store.get(NS_CHUNK, id, callback=extract_cb, callback_data=(id, i, i==n-1))
  177. else:
  178. raise Exception('Unknown archive item type %r' % item['mode'])
  179. def restore_attrs(self, path, item, symlink=False):
  180. xattrs = item.get('xattrs')
  181. if xattrs:
  182. xa = xattr(path, XATTR_NOFOLLOW)
  183. for k, v in xattrs.items():
  184. try:
  185. xa.set(k, v)
  186. except KeyError:
  187. pass
  188. if have_lchmod:
  189. os.lchmod(path, item['mode'])
  190. elif not symlink:
  191. os.chmod(path, item['mode'])
  192. uid = user2uid(item['user']) or item['uid']
  193. gid = group2gid(item['group']) or item['gid']
  194. try:
  195. os.lchown(path, uid, gid)
  196. except OSError:
  197. pass
  198. if not symlink:
  199. # FIXME: We should really call futimes here (c extension required)
  200. os.utime(path, (item['mtime'], item['mtime']))
  201. def verify_file(self, item, start, result):
  202. def verify_chunk(chunk, error, (id, i, last)):
  203. if i == 0:
  204. start(item)
  205. assert not error
  206. data, hash = self.key.decrypt(chunk)
  207. if self.key.id_hash(data) != id:
  208. result(item, False)
  209. elif last:
  210. result(item, True)
  211. n = len(item['chunks'])
  212. if n == 0:
  213. start(item)
  214. result(item, True)
  215. else:
  216. for i, (id, size, csize) in enumerate(item['chunks']):
  217. self.store.get(NS_CHUNK, id, callback=verify_chunk, callback_data=(id, i, i==n-1))
  218. def delete(self, cache):
  219. unpacker = msgpack.Unpacker()
  220. for id, size, csize in self.metadata['items']:
  221. if self.cache.seen_chunk(id) == 1:
  222. data, items_hash = self.key.decrypt(self.store.get(NS_CHUNK, id))
  223. assert self.key.id_hash(data) == id
  224. unpacker.feed(data)
  225. for item in unpacker:
  226. try:
  227. for chunk_id, size, csize in item['chunks']:
  228. self.cache.chunk_decref(chunk_id)
  229. except KeyError:
  230. pass
  231. self.cache.chunk_decref(id)
  232. self.store.delete(NS_ARCHIVE_METADATA, self.id)
  233. self.store.commit()
  234. cache.commit()
  235. def stat_attrs(self, st, path):
  236. item = {
  237. 'mode': st.st_mode,
  238. 'uid': st.st_uid, 'user': uid2user(st.st_uid),
  239. 'gid': st.st_gid, 'group': gid2group(st.st_gid),
  240. 'mtime': st.st_mtime,
  241. }
  242. try:
  243. xa = xattr(path, XATTR_NOFOLLOW)
  244. xattrs = {}
  245. for key in xa:
  246. # Only store the user namespace on Linux
  247. if linux and not key.startswith('user'):
  248. continue
  249. xattrs[key] = xa[key]
  250. if xattrs:
  251. item['xattrs'] = xattrs
  252. except IOError:
  253. pass
  254. return item
  255. def process_dir(self, path, st):
  256. item = {'path': path.lstrip('/\\:')}
  257. item.update(self.stat_attrs(st, path))
  258. self.add_item(item)
  259. def process_fifo(self, path, st):
  260. item = {'path': path.lstrip('/\\:')}
  261. item.update(self.stat_attrs(st, path))
  262. self.add_item(item)
  263. def process_symlink(self, path, st):
  264. source = os.readlink(path)
  265. item = {'path': path.lstrip('/\\:'), 'source': source}
  266. item.update(self.stat_attrs(st, path))
  267. self.add_item(item)
  268. def process_file(self, path, st, cache):
  269. safe_path = path.lstrip('/\\:')
  270. # Is it a hard link?
  271. if st.st_nlink > 1:
  272. source = self.hard_links.get((st.st_ino, st.st_dev))
  273. if (st.st_ino, st.st_dev) in self.hard_links:
  274. item = self.stat_attrs(st, path)
  275. item.update({'path': safe_path, 'source': source})
  276. self.add_item(item)
  277. return
  278. else:
  279. self.hard_links[st.st_ino, st.st_dev] = safe_path
  280. path_hash = self.key.id_hash(path.encode('utf-8'))
  281. ids = cache.file_known_and_unchanged(path_hash, st)
  282. chunks = None
  283. if ids is not None:
  284. # Make sure all ids are available
  285. for id in ids:
  286. if not cache.seen_chunk(id):
  287. break
  288. else:
  289. chunks = [cache.chunk_incref(id) for id in ids]
  290. # Only chunkify the file if needed
  291. if chunks is None:
  292. with open(path, 'rb') as fd:
  293. chunks = []
  294. for chunk in chunkify(fd, CHUNK_SIZE, WINDOW_SIZE,
  295. self.key.chunk_seed):
  296. chunks.append(cache.add_chunk(self.key.id_hash(chunk), chunk))
  297. ids = [id for id, _, _ in chunks]
  298. cache.memorize_file(path_hash, st, ids)
  299. item = {'path': safe_path, 'chunks': chunks}
  300. item.update(self.stat_attrs(st, path))
  301. self.add_item(item, ids)
  302. @staticmethod
  303. def list_archives(store, key):
  304. for id in list(store.list(NS_ARCHIVE_METADATA)):
  305. archive = Archive(store, key)
  306. archive.load(id)
  307. yield archive