archive.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320
  1. from __future__ import with_statement
  2. from datetime import datetime, timedelta
  3. from getpass import getuser
  4. import msgpack
  5. import os
  6. import socket
  7. import stat
  8. import sys
  9. from itertools import izip
  10. from xattr import xattr, XATTR_NOFOLLOW
  11. from . import NS_ARCHIVE_METADATA, NS_ARCHIVE_ITEMS, NS_ARCHIVE_CHUNKS, NS_CHUNK, \
  12. PACKET_ARCHIVE_METADATA, PACKET_ARCHIVE_ITEMS, PACKET_ARCHIVE_CHUNKS, PACKET_CHUNK
  13. from ._speedups import chunkify
  14. from .helpers import uid2user, user2uid, gid2group, group2gid, IntegrityError
  15. CHUNK_SIZE = 64 * 1024
  16. WINDOW_SIZE = 4096
  17. have_lchmod = hasattr(os, 'lchmod')
  18. linux = sys.platform == 'linux2'
  19. class Archive(object):
  20. class DoesNotExist(Exception):
  21. pass
  22. def __init__(self, store, keychain, name=None):
  23. self.keychain = keychain
  24. self.store = store
  25. self.items = []
  26. self.items_ids = []
  27. self.hard_links = {}
  28. if name:
  29. self.load(self.keychain.id_hash(name))
  30. def load(self, id):
  31. self.id = id
  32. try:
  33. kind, data, self.hash = self.keychain.decrypt(self.store.get(NS_ARCHIVE_METADATA, self.id))
  34. except self.store.DoesNotExist:
  35. raise self.DoesNotExist
  36. assert kind == PACKET_ARCHIVE_METADATA
  37. self.metadata = msgpack.unpackb(data)
  38. assert self.metadata['version'] == 1
  39. @property
  40. def ts(self):
  41. """Timestamp of archive creation in UTC"""
  42. t, f = self.metadata['time'].split('.', 1)
  43. return datetime.strptime(t, '%Y-%m-%dT%H:%M:%S') + timedelta(seconds=float('.' + f))
  44. def get_chunks(self):
  45. for id in self.metadata['chunks_ids']:
  46. magic, data, hash = self.keychain.decrypt(self.store.get(NS_ARCHIVE_CHUNKS, id))
  47. assert magic == PACKET_ARCHIVE_CHUNKS
  48. assert hash == id
  49. chunks = msgpack.unpackb(data)
  50. for chunk in chunks:
  51. yield chunk
  52. def get_items(self):
  53. for id in self.metadata['items_ids']:
  54. magic, data, items_hash = self.keychain.decrypt(self.store.get(NS_ARCHIVE_ITEMS, id))
  55. assert magic == PACKET_ARCHIVE_ITEMS
  56. assert items_hash == id
  57. items = msgpack.unpackb(data)
  58. for item in items:
  59. yield item
  60. def add_item(self, item):
  61. self.items.append(item)
  62. if len(self.items) > 100000:
  63. self.flush_items()
  64. def flush_items(self):
  65. data, hash = self.keychain.encrypt(PACKET_ARCHIVE_ITEMS, msgpack.packb(self.items))
  66. self.store.put(NS_ARCHIVE_ITEMS, hash, data)
  67. self.items_ids.append(hash)
  68. self.items = []
  69. def save_chunks(self, cache):
  70. chunks = []
  71. ids = []
  72. def flush(chunks):
  73. data, hash = self.keychain.encrypt(PACKET_ARCHIVE_CHUNKS, msgpack.packb(chunks))
  74. self.store.put(NS_ARCHIVE_CHUNKS, hash, data)
  75. ids.append(hash)
  76. for id, (count, size) in cache.chunks.iteritems():
  77. if count > 1000000:
  78. chunks.append((id, size))
  79. if len(chunks) > 100000:
  80. flush(chunks)
  81. chunks = []
  82. flush(chunks)
  83. return ids
  84. def save(self, name, cache):
  85. self.id = self.keychain.id_hash(name)
  86. chunks_ids = self.save_chunks(cache)
  87. self.flush_items()
  88. metadata = {
  89. 'version': 1,
  90. 'name': name,
  91. 'chunks_ids': chunks_ids,
  92. 'items_ids': self.items_ids,
  93. 'cmdline': sys.argv,
  94. 'hostname': socket.gethostname(),
  95. 'username': getuser(),
  96. 'time': datetime.utcnow().isoformat(),
  97. }
  98. data, self.hash = self.keychain.encrypt(PACKET_ARCHIVE_METADATA, msgpack.packb(metadata))
  99. self.store.put(NS_ARCHIVE_METADATA, self.id, data)
  100. self.store.commit()
  101. cache.commit()
  102. def stats(self, cache):
  103. osize = csize = usize = 0
  104. for item in self.get_items():
  105. if stat.S_ISREG(item['mode']) and not 'source' in item:
  106. osize += item['size']
  107. for id, size in self.get_chunks():
  108. csize += size
  109. if cache.seen_chunk(id) == 1:
  110. usize += size
  111. return osize, csize, usize
  112. def extract_item(self, item, dest=None, start_cb=None):
  113. dest = dest or os.getcwdu()
  114. dir_stat_queue = []
  115. assert item['path'][0] not in ('/', '\\', ':')
  116. path = os.path.join(dest, item['path'].decode('utf-8'))
  117. mode = item['mode']
  118. if stat.S_ISDIR(mode):
  119. if not os.path.exists(path):
  120. os.makedirs(path)
  121. self.restore_attrs(path, item)
  122. elif stat.S_ISFIFO(mode):
  123. if not os.path.exists(os.path.dirname(path)):
  124. os.makedirs(os.path.dirname(path))
  125. os.mkfifo(path)
  126. self.restore_attrs(path, item)
  127. elif stat.S_ISLNK(mode):
  128. if not os.path.exists(os.path.dirname(path)):
  129. os.makedirs(os.path.dirname(path))
  130. source = item['source']
  131. if os.path.exists(path):
  132. os.unlink(path)
  133. os.symlink(source, path)
  134. self.restore_attrs(path, item, symlink=True)
  135. elif stat.S_ISREG(mode):
  136. if not os.path.exists(os.path.dirname(path)):
  137. os.makedirs(os.path.dirname(path))
  138. # Hard link?
  139. if 'source' in item:
  140. source = os.path.join(dest, item['source'].decode('utf-8'))
  141. if os.path.exists(path):
  142. os.unlink(path)
  143. os.link(source, path)
  144. else:
  145. def extract_cb(chunk, error, (id, i, last)):
  146. if i==0:
  147. start_cb(item)
  148. assert not error
  149. magic, data, hash = self.keychain.decrypt(chunk)
  150. assert magic == PACKET_CHUNK
  151. if self.keychain.id_hash(data) != id:
  152. raise IntegrityError('chunk hash did not match')
  153. fd.write(data)
  154. if last:
  155. self.restore_attrs(path, item)
  156. fd.close()
  157. fd = open(path, 'wb')
  158. n = len(item['chunks'])
  159. if n == 0:
  160. start_cb(item)
  161. self.restore_attrs(path, item)
  162. fd.close()
  163. else:
  164. for i, id in enumerate(item['chunks']):
  165. self.store.get(NS_CHUNK, id, callback=extract_cb, callback_data=(id, i, i==n-1))
  166. else:
  167. raise Exception('Unknown archive item type %r' % item['mode'])
  168. def restore_attrs(self, path, item, symlink=False):
  169. xattrs = item.get('xattrs')
  170. if xattrs:
  171. xa = xattr(path, XATTR_NOFOLLOW)
  172. for k, v in xattrs.items():
  173. try:
  174. xa.set(k, v)
  175. except KeyError:
  176. pass
  177. if have_lchmod:
  178. os.lchmod(path, item['mode'])
  179. elif not symlink:
  180. os.chmod(path, item['mode'])
  181. uid = user2uid(item['user']) or item['uid']
  182. gid = group2gid(item['group']) or item['gid']
  183. try:
  184. os.lchown(path, uid, gid)
  185. except OSError:
  186. pass
  187. if not symlink:
  188. # FIXME: We should really call futimes here (c extension required)
  189. os.utime(path, (item['atime'], item['mtime']))
  190. def verify_file(self, item, start, result):
  191. def verify_chunk(chunk, error, (id, i, last)):
  192. if i == 0:
  193. start(item)
  194. assert not error
  195. magic, data, hash = self.keychain.decrypt(chunk)
  196. assert magic == PACKET_CHUNK
  197. if self.keychain.id_hash(data) != id:
  198. result(item, False)
  199. elif last:
  200. result(item, True)
  201. n = len(item['chunks'])
  202. if n == 0:
  203. start(item)
  204. result(item, True)
  205. else:
  206. for i, id in enumerate(item['chunks']):
  207. self.store.get(NS_CHUNK, id, callback=verify_chunk, callback_data=(id, i, i==n-1))
  208. def delete(self, cache):
  209. for id, size in self.get_chunks():
  210. cache.chunk_decref(id)
  211. self.store.delete(NS_ARCHIVE_METADATA, self.id)
  212. for id in self.metadata['chunks_ids']:
  213. self.store.delete(NS_ARCHIVE_CHUNKS, id)
  214. for id in self.metadata['items_ids']:
  215. self.store.delete(NS_ARCHIVE_ITEMS, id)
  216. self.store.commit()
  217. cache.commit()
  218. def stat_attrs(self, st, path):
  219. item = {
  220. 'mode': st.st_mode,
  221. 'uid': st.st_uid, 'user': uid2user(st.st_uid),
  222. 'gid': st.st_gid, 'group': gid2group(st.st_gid),
  223. 'atime': st.st_atime, 'mtime': st.st_mtime,
  224. }
  225. try:
  226. xa = xattr(path, XATTR_NOFOLLOW)
  227. xattrs = {}
  228. for key in xa:
  229. # Only store the user namespace on Linux
  230. if linux and not key.startswith('user'):
  231. continue
  232. xattrs[key] = xa[key]
  233. if xattrs:
  234. item['xattrs'] = xattrs
  235. except IOError:
  236. pass
  237. return item
  238. def process_dir(self, path, st):
  239. item = {'path': path.lstrip('/\\:')}
  240. item.update(self.stat_attrs(st, path))
  241. self.add_item(item)
  242. def process_fifo(self, path, st):
  243. item = {'path': path.lstrip('/\\:')}
  244. item.update(self.stat_attrs(st, path))
  245. self.add_item(item)
  246. def process_symlink(self, path, st):
  247. source = os.readlink(path)
  248. item = {'path': path.lstrip('/\\:'), 'source': source}
  249. item.update(self.stat_attrs(st, path))
  250. self.add_item(item)
  251. def process_file(self, path, st, cache):
  252. safe_path = path.lstrip('/\\:')
  253. # Is it a hard link?
  254. if st.st_nlink > 1:
  255. source = self.hard_links.get((st.st_ino, st.st_dev))
  256. if (st.st_ino, st.st_dev) in self.hard_links:
  257. item = self.stat_attrs(st, path)
  258. item.update({'path': safe_path, 'source': source})
  259. self.add_item(item)
  260. return
  261. else:
  262. self.hard_links[st.st_ino, st.st_dev] = safe_path
  263. path_hash = self.keychain.id_hash(path.encode('utf-8'))
  264. ids, size = cache.file_known_and_unchanged(path_hash, st)
  265. if ids is not None:
  266. # Make sure all ids are available
  267. for id in ids:
  268. if not cache.seen_chunk(id):
  269. ids = None
  270. break
  271. else:
  272. for id in ids:
  273. cache.chunk_incref(id)
  274. # Only chunkify the file if needed
  275. if ids is None:
  276. with open(path, 'rb') as fd:
  277. size = 0
  278. ids = []
  279. for chunk in chunkify(fd, CHUNK_SIZE, WINDOW_SIZE,
  280. self.keychain.get_chunkify_seed()):
  281. ids.append(cache.add_chunk(self.keychain.id_hash(chunk), chunk))
  282. size += len(chunk)
  283. cache.memorize_file(path_hash, st, ids)
  284. item = {'path': safe_path, 'chunks': ids, 'size': size}
  285. item.update(self.stat_attrs(st, path))
  286. self.add_item(item)
  287. @staticmethod
  288. def list_archives(store, keychain):
  289. for id in list(store.list(NS_ARCHIVE_METADATA)):
  290. archive = Archive(store, keychain)
  291. archive.load(id)
  292. yield archive