archive.py 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270
  1. from datetime import datetime
  2. from getpass import getuser
  3. import msgpack
  4. import os
  5. import socket
  6. import stat
  7. import sys
  8. from xattr import xattr, XATTR_NOFOLLOW
  9. from . import NS_ARCHIVE_METADATA, NS_ARCHIVE_ITEMS, NS_ARCHIVE_CHUNKS, NS_CHUNK
  10. from ._speedups import chunkify
  11. from .helpers import uid2user, user2uid, gid2group, group2gid, IntegrityError
  12. CHUNK_SIZE = 64 * 1024
  13. WINDOW_SIZE = 4096
  14. have_lchmod = hasattr(os, 'lchmod')
  15. linux = sys.platform == 'linux2'
  16. class Archive(object):
  17. class DoesNotExist(Exception):
  18. pass
  19. def __init__(self, store, keychain, name=None):
  20. self.keychain = keychain
  21. self.store = store
  22. self.items = []
  23. self.items_ids = []
  24. self.hard_links = {}
  25. if name:
  26. self.load(self.keychain.id_hash(name))
  27. def load(self, id):
  28. self.id = id
  29. try:
  30. data, self.hash = self.keychain.decrypt(self.store.get(NS_ARCHIVE_METADATA, self.id))
  31. except self.store.DoesNotExist:
  32. raise self.DoesNotExist
  33. self.metadata = msgpack.unpackb(data)
  34. assert self.metadata['version'] == 1
  35. def get_chunks(self):
  36. data, chunks_hash = self.keychain.decrypt(self.store.get(NS_ARCHIVE_CHUNKS, self.id))
  37. chunks = msgpack.unpackb(data)
  38. assert chunks['version'] == 1
  39. assert self.metadata['chunks_hash'] == chunks_hash
  40. return chunks['chunks']
  41. def get_items(self):
  42. for id in self.metadata['items_ids']:
  43. data, items_hash = self.keychain.decrypt(self.store.get(NS_ARCHIVE_ITEMS, id))
  44. assert items_hash == id
  45. items = msgpack.unpackb(data)
  46. assert items['version'] == 1
  47. for item in items['items']:
  48. yield item
  49. def add_item(self, item):
  50. self.items.append(item)
  51. if len(self.items) > 100000:
  52. self.flush_items()
  53. def flush_items(self):
  54. items = {'version': 1, 'items': self.items}
  55. data, items_hash = self.keychain.encrypt_read(msgpack.packb(items))
  56. self.store.put(NS_ARCHIVE_ITEMS, items_hash, data)
  57. self.items = []
  58. self.items_ids.append(items_hash)
  59. def save(self, name, cache):
  60. self.id = self.keychain.id_hash(name)
  61. self.chunks = [(id, size) for (id, (count, size)) in cache.chunk_counts.iteritems() if count > 1000000]
  62. chunks = {'version': 1, 'chunks': self.chunks}
  63. data, chunks_hash = self.keychain.encrypt_create(msgpack.packb(chunks))
  64. self.store.put(NS_ARCHIVE_CHUNKS, self.id, data)
  65. self.flush_items()
  66. metadata = {
  67. 'version': 1,
  68. 'name': name,
  69. 'chunks_hash': chunks_hash,
  70. 'items_ids': self.items_ids,
  71. 'cmdline': sys.argv,
  72. 'hostname': socket.gethostname(),
  73. 'username': getuser(),
  74. 'time': datetime.utcnow().isoformat(),
  75. }
  76. data, self.hash = self.keychain.encrypt_read(msgpack.packb(metadata))
  77. self.store.put(NS_ARCHIVE_METADATA, self.id, data)
  78. self.store.commit()
  79. def stats(self, cache):
  80. osize = csize = usize = 0
  81. for item in self.get_items():
  82. if stat.S_ISREG(item['mode']) and not 'source' in item:
  83. osize += item['size']
  84. for id, size in self.get_chunks():
  85. csize += size
  86. if cache.seen_chunk(id) == 1:
  87. usize += size
  88. return osize, csize, usize
  89. def extract_item(self, item, dest=None):
  90. dest = dest or os.getcwdu()
  91. dir_stat_queue = []
  92. assert item['path'][0] not in ('/', '\\', ':')
  93. path = os.path.join(dest, item['path'].decode('utf-8'))
  94. mode = item['mode']
  95. if stat.S_ISDIR(mode):
  96. if not os.path.exists(path):
  97. os.makedirs(path)
  98. self.restore_attrs(path, item)
  99. elif stat.S_ISFIFO(mode):
  100. if not os.path.exists(os.path.dirname(path)):
  101. os.makedirs(os.path.dirname(path))
  102. os.mkfifo(path)
  103. self.restore_attrs(path, item)
  104. elif stat.S_ISLNK(mode):
  105. if not os.path.exists(os.path.dirname(path)):
  106. os.makedirs(os.path.dirname(path))
  107. source = item['source']
  108. if os.path.exists(path):
  109. os.unlink(path)
  110. os.symlink(source, path)
  111. self.restore_attrs(path, item, symlink=True)
  112. elif stat.S_ISREG(mode):
  113. if not os.path.exists(os.path.dirname(path)):
  114. os.makedirs(os.path.dirname(path))
  115. # Hard link?
  116. if 'source' in item:
  117. source = os.path.join(dest, item['source'])
  118. if os.path.exists(path):
  119. os.unlink(path)
  120. os.link(source, path)
  121. else:
  122. with open(path, 'wb') as fd:
  123. for id in item['chunks']:
  124. try:
  125. data, hash = self.keychain.decrypt(self.store.get(NS_CHUNK, id))
  126. if self.keychain.id_hash(data) != id:
  127. raise IntegrityError('chunk hash did not match')
  128. fd.write(data)
  129. except ValueError:
  130. raise Exception('Invalid chunk checksum')
  131. self.restore_attrs(path, item)
  132. else:
  133. raise Exception('Unknown archive item type %r' % item['mode'])
  134. def restore_attrs(self, path, item, symlink=False):
  135. xattrs = item.get('xattrs')
  136. if xattrs:
  137. xa = xattr(path, XATTR_NOFOLLOW)
  138. for k, v in xattrs.items():
  139. try:
  140. xa.set(k, v)
  141. except KeyError:
  142. pass
  143. if have_lchmod:
  144. os.lchmod(path, item['mode'])
  145. elif not symlink:
  146. os.chmod(path, item['mode'])
  147. uid = user2uid(item['user']) or item['uid']
  148. gid = group2gid(item['group']) or item['gid']
  149. try:
  150. os.lchown(path, uid, gid)
  151. except OSError:
  152. pass
  153. if not symlink:
  154. # FIXME: We should really call futimes here (c extension required)
  155. os.utime(path, (item['atime'], item['mtime']))
  156. def verify_file(self, item):
  157. for id in item['chunks']:
  158. try:
  159. data, hash = self.keychain.decrypt(self.store.get(NS_CHUNK, id))
  160. if self.keychain.id_hash(data) != id:
  161. raise IntegrityError('chunk id did not match')
  162. except IntegrityError:
  163. return False
  164. return True
  165. def delete(self, cache):
  166. self.store.delete(NS_ARCHIVE_CHUNKS, self.id)
  167. self.store.delete(NS_ARCHIVE_ITEMS, self.id)
  168. self.store.delete(NS_ARCHIVE_METADATA, self.id)
  169. for id, size in self.get_chunks():
  170. cache.chunk_decref(id)
  171. self.store.commit()
  172. cache.save()
  173. def stat_attrs(self, st, path):
  174. item = {
  175. 'mode': st.st_mode,
  176. 'uid': st.st_uid, 'user': uid2user(st.st_uid),
  177. 'gid': st.st_gid, 'group': gid2group(st.st_gid),
  178. 'atime': st.st_atime, 'mtime': st.st_mtime,
  179. }
  180. try:
  181. xa = xattr(path, XATTR_NOFOLLOW)
  182. xattrs = {}
  183. for key in xa:
  184. # Only store the user namespace on Linux
  185. if linux and not key.startswith('user'):
  186. continue
  187. xattrs[key] = xa[key]
  188. if xattrs:
  189. item['xattrs'] = xattrs
  190. except IOError:
  191. pass
  192. return item
  193. def process_dir(self, path, st):
  194. item = {'path': path.lstrip('/\\:')}
  195. item.update(self.stat_attrs(st, path))
  196. self.add_item(item)
  197. def process_fifo(self, path, st):
  198. item = {'path': path.lstrip('/\\:')}
  199. item.update(self.stat_attrs(st, path))
  200. self.add_item(item)
  201. def process_symlink(self, path, st):
  202. source = os.readlink(path)
  203. item = {'path': path.lstrip('/\\:'), 'source': source}
  204. item.update(self.stat_attrs(st, path))
  205. self.add_item(item)
  206. def process_file(self, path, st, cache):
  207. safe_path = path.lstrip('/\\:')
  208. # Is it a hard link?
  209. if st.st_nlink > 1:
  210. source = self.hard_links.get((st.st_ino, st.st_dev))
  211. if (st.st_ino, st.st_dev) in self.hard_links:
  212. self.add_item({'mode': st.st_mode,
  213. 'path': path, 'source': source})
  214. return
  215. else:
  216. self.hard_links[st.st_ino, st.st_dev] = safe_path
  217. path_hash = self.keychain.id_hash(path.encode('utf-8'))
  218. ids, size = cache.file_known_and_unchanged(path_hash, st)
  219. if ids is not None:
  220. # Make sure all ids are available
  221. for id in ids:
  222. if not cache.seen_chunk(id):
  223. ids = None
  224. break
  225. else:
  226. for id in ids:
  227. cache.chunk_incref(id)
  228. # Only chunkify the file if needed
  229. if ids is None:
  230. with open(path, 'rb') as fd:
  231. size = 0
  232. ids = []
  233. for chunk in chunkify(fd, CHUNK_SIZE, WINDOW_SIZE,
  234. self.keychain.get_chunkify_seed()):
  235. ids.append(cache.add_chunk(self.keychain.id_hash(chunk), chunk))
  236. size += len(chunk)
  237. cache.memorize_file_chunks(path_hash, st, ids)
  238. item = {'path': safe_path, 'chunks': ids, 'size': size}
  239. item.update(self.stat_attrs(st, path))
  240. self.add_item(item)
  241. @staticmethod
  242. def list_archives(store, keychain):
  243. for id in list(store.list(NS_ARCHIVE_METADATA)):
  244. archive = Archive(store, keychain)
  245. archive.load(id)
  246. yield archive