archive.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299
  1. from __future__ import with_statement
  2. from datetime import datetime, timedelta
  3. from getpass import getuser
  4. import msgpack
  5. import os
  6. import socket
  7. import stat
  8. import sys
  9. from xattr import xattr, XATTR_NOFOLLOW
  10. from . import NS_ARCHIVE_METADATA, NS_ARCHIVE_ITEMS, NS_ARCHIVE_CHUNKS, NS_CHUNK, \
  11. PACKET_ARCHIVE_METADATA, PACKET_ARCHIVE_ITEMS, PACKET_ARCHIVE_CHUNKS, PACKET_CHUNK
  12. from ._speedups import chunkify
  13. from .helpers import uid2user, user2uid, gid2group, group2gid, IntegrityError
  14. CHUNK_SIZE = 64 * 1024
  15. WINDOW_SIZE = 4096
  16. have_lchmod = hasattr(os, 'lchmod')
  17. linux = sys.platform == 'linux2'
  18. class Archive(object):
  19. class DoesNotExist(Exception):
  20. pass
  21. def __init__(self, store, keychain, name=None):
  22. self.keychain = keychain
  23. self.store = store
  24. self.items = []
  25. self.items_ids = []
  26. self.hard_links = {}
  27. if name:
  28. self.load(self.keychain.id_hash(name))
  29. def load(self, id):
  30. self.id = id
  31. try:
  32. kind, data, self.hash = self.keychain.decrypt(self.store.get(NS_ARCHIVE_METADATA, self.id))
  33. except self.store.DoesNotExist:
  34. raise self.DoesNotExist
  35. assert kind == PACKET_ARCHIVE_METADATA
  36. self.metadata = msgpack.unpackb(data)
  37. assert self.metadata['version'] == 1
  38. @property
  39. def ts(self):
  40. """Timestamp of archive creation in UTC"""
  41. t, f = self.metadata['time'].split('.', 1)
  42. return datetime.strptime(t, '%Y-%m-%dT%H:%M:%S') + timedelta(seconds=float('.' + f))
  43. def get_chunks(self):
  44. for id in self.metadata['chunks_ids']:
  45. magic, data, hash = self.keychain.decrypt(self.store.get(NS_ARCHIVE_CHUNKS, id))
  46. assert magic == PACKET_ARCHIVE_CHUNKS
  47. assert hash == id
  48. chunks = msgpack.unpackb(data)
  49. for chunk in chunks:
  50. yield chunk
  51. def get_items(self):
  52. for id in self.metadata['items_ids']:
  53. magic, data, items_hash = self.keychain.decrypt(self.store.get(NS_ARCHIVE_ITEMS, id))
  54. assert magic == PACKET_ARCHIVE_ITEMS
  55. assert items_hash == id
  56. items = msgpack.unpackb(data)
  57. for item in items:
  58. yield item
  59. def add_item(self, item):
  60. self.items.append(item)
  61. if len(self.items) > 100000:
  62. self.flush_items()
  63. def flush_items(self):
  64. data, hash = self.keychain.encrypt(PACKET_ARCHIVE_ITEMS, msgpack.packb(self.items))
  65. self.store.put(NS_ARCHIVE_ITEMS, hash, data)
  66. self.items_ids.append(hash)
  67. self.items = []
  68. def save_chunks(self, cache):
  69. chunks = []
  70. ids = []
  71. def flush(chunks):
  72. data, hash = self.keychain.encrypt(PACKET_ARCHIVE_CHUNKS, msgpack.packb(chunks))
  73. self.store.put(NS_ARCHIVE_CHUNKS, hash, data)
  74. ids.append(hash)
  75. for id, (count, size) in cache.chunks.iteritems():
  76. if count > 1000000:
  77. chunks.append((id, size))
  78. if len(chunks) > 100000:
  79. flush(chunks)
  80. chunks = []
  81. flush(chunks)
  82. return ids
  83. def save(self, name, cache):
  84. self.id = self.keychain.id_hash(name)
  85. chunks_ids = self.save_chunks(cache)
  86. self.flush_items()
  87. metadata = {
  88. 'version': 1,
  89. 'name': name,
  90. 'chunks_ids': chunks_ids,
  91. 'items_ids': self.items_ids,
  92. 'cmdline': sys.argv,
  93. 'hostname': socket.gethostname(),
  94. 'username': getuser(),
  95. 'time': datetime.utcnow().isoformat(),
  96. }
  97. data, self.hash = self.keychain.encrypt(PACKET_ARCHIVE_METADATA, msgpack.packb(metadata))
  98. self.store.put(NS_ARCHIVE_METADATA, self.id, data)
  99. self.store.commit()
  100. cache.commit()
  101. def stats(self, cache):
  102. osize = csize = usize = 0
  103. for item in self.get_items():
  104. if stat.S_ISREG(item['mode']) and not 'source' in item:
  105. osize += item['size']
  106. for id, size in self.get_chunks():
  107. csize += size
  108. if cache.seen_chunk(id) == 1:
  109. usize += size
  110. return osize, csize, usize
  111. def extract_item(self, item, dest=None):
  112. dest = dest or os.getcwdu()
  113. dir_stat_queue = []
  114. assert item['path'][0] not in ('/', '\\', ':')
  115. path = os.path.join(dest, item['path'].decode('utf-8'))
  116. mode = item['mode']
  117. if stat.S_ISDIR(mode):
  118. if not os.path.exists(path):
  119. os.makedirs(path)
  120. self.restore_attrs(path, item)
  121. elif stat.S_ISFIFO(mode):
  122. if not os.path.exists(os.path.dirname(path)):
  123. os.makedirs(os.path.dirname(path))
  124. os.mkfifo(path)
  125. self.restore_attrs(path, item)
  126. elif stat.S_ISLNK(mode):
  127. if not os.path.exists(os.path.dirname(path)):
  128. os.makedirs(os.path.dirname(path))
  129. source = item['source']
  130. if os.path.exists(path):
  131. os.unlink(path)
  132. os.symlink(source, path)
  133. self.restore_attrs(path, item, symlink=True)
  134. elif stat.S_ISREG(mode):
  135. if not os.path.exists(os.path.dirname(path)):
  136. os.makedirs(os.path.dirname(path))
  137. # Hard link?
  138. if 'source' in item:
  139. source = os.path.join(dest, item['source'].decode('utf-8'))
  140. if os.path.exists(path):
  141. os.unlink(path)
  142. os.link(source, path)
  143. else:
  144. with open(path, 'wb') as fd:
  145. for id in item['chunks']:
  146. try:
  147. magic, data, hash = self.keychain.decrypt(self.store.get(NS_CHUNK, id))
  148. assert magic == PACKET_CHUNK
  149. if self.keychain.id_hash(data) != id:
  150. raise IntegrityError('chunk hash did not match')
  151. fd.write(data)
  152. except ValueError:
  153. raise Exception('Invalid chunk checksum')
  154. self.restore_attrs(path, item)
  155. else:
  156. raise Exception('Unknown archive item type %r' % item['mode'])
  157. def restore_attrs(self, path, item, symlink=False):
  158. xattrs = item.get('xattrs')
  159. if xattrs:
  160. xa = xattr(path, XATTR_NOFOLLOW)
  161. for k, v in xattrs.items():
  162. try:
  163. xa.set(k, v)
  164. except KeyError:
  165. pass
  166. if have_lchmod:
  167. os.lchmod(path, item['mode'])
  168. elif not symlink:
  169. os.chmod(path, item['mode'])
  170. uid = user2uid(item['user']) or item['uid']
  171. gid = group2gid(item['group']) or item['gid']
  172. try:
  173. os.lchown(path, uid, gid)
  174. except OSError:
  175. pass
  176. if not symlink:
  177. # FIXME: We should really call futimes here (c extension required)
  178. os.utime(path, (item['atime'], item['mtime']))
  179. def verify_file(self, item):
  180. for id in item['chunks']:
  181. try:
  182. magic, data, hash = self.keychain.decrypt(self.store.get(NS_CHUNK, id))
  183. assert magic == PACKET_CHUNK
  184. if self.keychain.id_hash(data) != id:
  185. raise IntegrityError('chunk id did not match')
  186. except IntegrityError:
  187. return False
  188. return True
  189. def delete(self, cache):
  190. for id, size in self.get_chunks():
  191. cache.chunk_decref(id)
  192. self.store.delete(NS_ARCHIVE_METADATA, self.id)
  193. for id in self.metadata['chunks_ids']:
  194. self.store.delete(NS_ARCHIVE_CHUNKS, id)
  195. for id in self.metadata['items_ids']:
  196. self.store.delete(NS_ARCHIVE_ITEMS, id)
  197. self.store.commit()
  198. cache.commit()
  199. def stat_attrs(self, st, path):
  200. item = {
  201. 'mode': st.st_mode,
  202. 'uid': st.st_uid, 'user': uid2user(st.st_uid),
  203. 'gid': st.st_gid, 'group': gid2group(st.st_gid),
  204. 'atime': st.st_atime, 'mtime': st.st_mtime,
  205. }
  206. try:
  207. xa = xattr(path, XATTR_NOFOLLOW)
  208. xattrs = {}
  209. for key in xa:
  210. # Only store the user namespace on Linux
  211. if linux and not key.startswith('user'):
  212. continue
  213. xattrs[key] = xa[key]
  214. if xattrs:
  215. item['xattrs'] = xattrs
  216. except IOError:
  217. pass
  218. return item
  219. def process_dir(self, path, st):
  220. item = {'path': path.lstrip('/\\:')}
  221. item.update(self.stat_attrs(st, path))
  222. self.add_item(item)
  223. def process_fifo(self, path, st):
  224. item = {'path': path.lstrip('/\\:')}
  225. item.update(self.stat_attrs(st, path))
  226. self.add_item(item)
  227. def process_symlink(self, path, st):
  228. source = os.readlink(path)
  229. item = {'path': path.lstrip('/\\:'), 'source': source}
  230. item.update(self.stat_attrs(st, path))
  231. self.add_item(item)
  232. def process_file(self, path, st, cache):
  233. safe_path = path.lstrip('/\\:')
  234. # Is it a hard link?
  235. if st.st_nlink > 1:
  236. source = self.hard_links.get((st.st_ino, st.st_dev))
  237. if (st.st_ino, st.st_dev) in self.hard_links:
  238. item = self.stat_attrs(st, path)
  239. item.update({'path': safe_path, 'source': source})
  240. self.add_item(item)
  241. return
  242. else:
  243. self.hard_links[st.st_ino, st.st_dev] = safe_path
  244. path_hash = self.keychain.id_hash(path.encode('utf-8'))
  245. ids, size = cache.file_known_and_unchanged(path_hash, st)
  246. if ids is not None:
  247. # Make sure all ids are available
  248. for id in ids:
  249. if not cache.seen_chunk(id):
  250. ids = None
  251. break
  252. else:
  253. for id in ids:
  254. cache.chunk_incref(id)
  255. # Only chunkify the file if needed
  256. if ids is None:
  257. with open(path, 'rb') as fd:
  258. size = 0
  259. ids = []
  260. for chunk in chunkify(fd, CHUNK_SIZE, WINDOW_SIZE,
  261. self.keychain.get_chunkify_seed()):
  262. ids.append(cache.add_chunk(self.keychain.id_hash(chunk), chunk))
  263. size += len(chunk)
  264. cache.memorize_file(path_hash, st, ids)
  265. item = {'path': safe_path, 'chunks': ids, 'size': size}
  266. item.update(self.stat_attrs(st, path))
  267. self.add_item(item)
  268. @staticmethod
  269. def list_archives(store, keychain):
  270. for id in list(store.list(NS_ARCHIVE_METADATA)):
  271. archive = Archive(store, keychain)
  272. archive.load(id)
  273. yield archive