archive.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291
  1. from datetime import datetime
  2. from getpass import getuser
  3. import msgpack
  4. import os
  5. import socket
  6. import stat
  7. import sys
  8. from xattr import xattr, XATTR_NOFOLLOW
  9. from . import NS_ARCHIVE_METADATA, NS_ARCHIVE_ITEMS, NS_ARCHIVE_CHUNKS, NS_CHUNK
  10. from .chunkifier import chunkify
  11. from .helpers import uid2user, user2uid, gid2group, group2gid, IntegrityError
  12. CHUNK_SIZE = 55001
  13. have_lchmod = hasattr(os, 'lchmod')
  14. linux = sys.platform == 'linux2'
  15. class Archive(object):
  16. class DoesNotExist(Exception):
  17. pass
  18. def __init__(self, store, keychain, name=None):
  19. self.keychain = keychain
  20. self.store = store
  21. self.items = []
  22. self.chunks = []
  23. self.chunk_idx = {}
  24. self.hard_links = {}
  25. if name:
  26. self.load(self.keychain.id_hash(name))
  27. def load(self, id):
  28. self.id = id
  29. try:
  30. data, self.hash = self.keychain.decrypt(self.store.get(NS_ARCHIVE_METADATA, self.id))
  31. except self.store.DoesNotExist:
  32. raise self.DoesNotExist
  33. self.metadata = msgpack.unpackb(data)
  34. assert self.metadata['version'] == 1
  35. def get_items(self):
  36. data, chunks_hash = self.keychain.decrypt(self.store.get(NS_ARCHIVE_CHUNKS, self.id))
  37. chunks = msgpack.unpackb(data)
  38. assert chunks['version'] == 1
  39. assert self.metadata['chunks_hash'] == chunks_hash
  40. self.chunks = chunks['chunks']
  41. data, items_hash = self.keychain.decrypt(self.store.get(NS_ARCHIVE_ITEMS, self.id))
  42. items = msgpack.unpackb(data)
  43. assert items['version'] == 1
  44. assert self.metadata['items_hash'] == items_hash
  45. self.items = items['items']
  46. for i, chunk in enumerate(self.chunks):
  47. self.chunk_idx[i] = chunk[0]
  48. def save(self, name):
  49. self.id = self.keychain.id_hash(name)
  50. chunks = {'version': 1, 'chunks': self.chunks}
  51. data, chunks_hash = self.keychain.encrypt_create(msgpack.packb(chunks))
  52. self.store.put(NS_ARCHIVE_CHUNKS, self.id, data)
  53. items = {'version': 1, 'items': self.items}
  54. data, items_hash = self.keychain.encrypt_read(msgpack.packb(items))
  55. self.store.put(NS_ARCHIVE_ITEMS, self.id, data)
  56. metadata = {
  57. 'version': 1,
  58. 'name': name,
  59. 'chunks_hash': chunks_hash,
  60. 'items_hash': items_hash,
  61. 'cmdline': sys.argv,
  62. 'hostname': socket.gethostname(),
  63. 'username': getuser(),
  64. 'time': datetime.utcnow().isoformat(),
  65. }
  66. data, self.hash = self.keychain.encrypt_read(msgpack.packb(metadata))
  67. self.store.put(NS_ARCHIVE_METADATA, self.id, data)
  68. self.store.commit()
  69. def add_chunk(self, id, size):
  70. try:
  71. return self.chunk_idx[id]
  72. except KeyError:
  73. idx = len(self.chunks)
  74. self.chunks.append((id, size))
  75. self.chunk_idx[id] = idx
  76. return idx
  77. def stats(self, cache):
  78. self.get_items()
  79. osize = csize = usize = 0
  80. for item in self.items:
  81. if stat.S_ISREG(item['mode']) and not 'source' in item:
  82. osize += item['size']
  83. for id, size in self.chunks:
  84. csize += size
  85. if cache.seen_chunk(id) == 1:
  86. usize += size
  87. return osize, csize, usize
  88. def extract_item(self, item, dest=None):
  89. dest = dest or os.getcwdu()
  90. dir_stat_queue = []
  91. assert item['path'][0] not in ('/', '\\', ':')
  92. path = os.path.join(dest, item['path'].decode('utf-8'))
  93. mode = item['mode']
  94. if stat.S_ISDIR(mode):
  95. if not os.path.exists(path):
  96. os.makedirs(path)
  97. self.restore_attrs(path, item)
  98. elif stat.S_ISFIFO(mode):
  99. if not os.path.exists(os.path.dirname(path)):
  100. os.makedirs(os.path.dirname(path))
  101. os.mkfifo(path)
  102. self.restore_attrs(path, item)
  103. elif stat.S_ISLNK(mode):
  104. if not os.path.exists(os.path.dirname(path)):
  105. os.makedirs(os.path.dirname(path))
  106. source = item['source']
  107. if os.path.exists(path):
  108. os.unlink(path)
  109. os.symlink(source, path)
  110. self.restore_attrs(path, item, symlink=True)
  111. elif stat.S_ISREG(mode):
  112. if not os.path.exists(os.path.dirname(path)):
  113. os.makedirs(os.path.dirname(path))
  114. # Hard link?
  115. if 'source' in item:
  116. source = os.path.join(dest, item['source'])
  117. if os.path.exists(path):
  118. os.unlink(path)
  119. os.link(source, path)
  120. else:
  121. with open(path, 'wb') as fd:
  122. for chunk in item['chunks']:
  123. id = self.chunk_idx[chunk]
  124. try:
  125. data, hash = self.keychain.decrypt(self.store.get(NS_CHUNK, id))
  126. if self.keychain.id_hash(data) != id:
  127. raise IntegrityError('chunk id did not match')
  128. fd.write(data)
  129. except ValueError:
  130. raise Exception('Invalid chunk checksum')
  131. self.restore_attrs(path, item)
  132. else:
  133. raise Exception('Unknown archive item type %r' % item['mode'])
  134. def restore_attrs(self, path, item, symlink=False):
  135. xattrs = item.get('xattrs')
  136. if xattrs:
  137. xa = xattr(path, XATTR_NOFOLLOW)
  138. for k, v in xattrs.items():
  139. try:
  140. xa.set(k, v)
  141. except KeyError:
  142. pass
  143. if have_lchmod:
  144. os.lchmod(path, item['mode'])
  145. elif not symlink:
  146. os.chmod(path, item['mode'])
  147. uid = user2uid(item['user']) or item['uid']
  148. gid = group2gid(item['group']) or item['gid']
  149. try:
  150. os.lchown(path, uid, gid)
  151. except OSError:
  152. pass
  153. if not symlink:
  154. # FIXME: We should really call futimes here (c extension required)
  155. os.utime(path, (item['atime'], item['mtime']))
  156. def verify_file(self, item):
  157. for chunk in item['chunks']:
  158. id = self.chunk_idx[chunk]
  159. try:
  160. data, hash = self.keychain.decrypt(self.store.get(NS_CHUNK, id))
  161. if self.keychain.id_hash(data) != id:
  162. raise IntegrityError('chunk id did not match')
  163. except IntegrityError:
  164. return False
  165. return True
  166. def delete(self, cache):
  167. self.get_items()
  168. self.store.delete(NS_ARCHIVE_CHUNKS, self.id)
  169. self.store.delete(NS_ARCHIVE_ITEMS, self.id)
  170. self.store.delete(NS_ARCHIVE_METADATA, self.id)
  171. for id, size in self.chunks:
  172. cache.chunk_decref(id)
  173. self.store.commit()
  174. cache.save()
  175. def stat_attrs(self, st, path):
  176. item = {
  177. 'mode': st.st_mode,
  178. 'uid': st.st_uid, 'user': uid2user(st.st_uid),
  179. 'gid': st.st_gid, 'group': gid2group(st.st_gid),
  180. 'atime': st.st_atime, 'mtime': st.st_mtime,
  181. }
  182. try:
  183. xa = xattr(path, XATTR_NOFOLLOW)
  184. xattrs = {}
  185. for key in xa:
  186. # Only store the user namespace on Linux
  187. if linux and not key.startswith('user'):
  188. continue
  189. xattrs[key] = xa[key]
  190. if xattrs:
  191. item['xattrs'] = xattrs
  192. except IOError:
  193. pass
  194. return item
  195. def process_dir(self, path, st):
  196. item = {'path': path.lstrip('/\\:')}
  197. item.update(self.stat_attrs(st, path))
  198. self.items.append(item)
  199. def process_fifo(self, path, st):
  200. item = {'path': path.lstrip('/\\:')}
  201. item.update(self.stat_attrs(st, path))
  202. self.items.append(item)
  203. def process_symlink(self, path, st):
  204. source = os.readlink(path)
  205. item = {'path': path.lstrip('/\\:'), 'source': source}
  206. item.update(self.stat_attrs(st, path))
  207. self.items.append(item)
  208. def process_file(self, path, st, cache):
  209. safe_path = path.lstrip('/\\:')
  210. # Is it a hard link?
  211. if st.st_nlink > 1:
  212. source = self.hard_links.get((st.st_ino, st.st_dev))
  213. if (st.st_ino, st.st_dev) in self.hard_links:
  214. self.items.append({'path': path, 'source': source})
  215. return
  216. else:
  217. self.hard_links[st.st_ino, st.st_dev] = safe_path
  218. path_hash = self.keychain.id_hash(path.encode('utf-8'))
  219. ids, size = cache.file_known_and_unchanged(path_hash, st)
  220. if ids is not None:
  221. # Make sure all ids are available
  222. for id in ids:
  223. if not cache.seen_chunk(id):
  224. ids = None
  225. break
  226. else:
  227. chunks = [self.process_chunk2(id, cache) for id in ids]
  228. # Only chunkify the file if needed
  229. if ids is None:
  230. fd = open(path, 'rb')
  231. with open(path, 'rb') as fd:
  232. size = 0
  233. ids = []
  234. chunks = []
  235. for chunk in chunkify(fd, CHUNK_SIZE, 30):
  236. id = self.keychain.id_hash(chunk)
  237. ids.append(id)
  238. try:
  239. chunks.append(self.chunk_idx[id])
  240. except KeyError:
  241. chunks.append(self.process_chunk(id, chunk, cache))
  242. size += len(chunk)
  243. cache.memorize_file_chunks(path_hash, st, ids)
  244. item = {'path': safe_path, 'chunks': chunks, 'size': size}
  245. item.update(self.stat_attrs(st, path))
  246. self.items.append(item)
  247. def process_chunk2(self, id, cache):
  248. try:
  249. return self.chunk_idx[id]
  250. except KeyError:
  251. idx = len(self.chunks)
  252. size = cache.chunk_incref(id)
  253. self.chunks.append((id, size))
  254. self.chunk_idx[id] = idx
  255. return idx
  256. def process_chunk(self, id, data, cache):
  257. idx = len(self.chunks)
  258. size = cache.add_chunk(id, data)
  259. self.chunks.append((id, size))
  260. self.chunk_idx[id] = idx
  261. return idx
  262. @staticmethod
  263. def list_archives(store, keychain):
  264. for id in list(store.list(NS_ARCHIVE_METADATA)):
  265. archive = Archive(store, keychain)
  266. archive.load(id)
  267. yield archive