archive.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282
  1. from datetime import datetime
  2. from getpass import getuser
  3. import msgpack
  4. import os
  5. import socket
  6. import stat
  7. import sys
  8. from xattr import xattr, XATTR_NOFOLLOW
  9. from . import NS_ARCHIVE_METADATA, NS_ARCHIVE_ITEMS, NS_ARCHIVE_CHUNKS, NS_CHUNK
  10. from .chunkifier import chunkify
  11. from .helpers import uid2user, user2uid, gid2group, group2gid, IntegrityError
  12. CHUNK_SIZE = 55001
  13. have_lchmod = hasattr(os, 'lchmod')
  14. class Archive(object):
  15. class DoesNotExist(Exception):
  16. pass
  17. def __init__(self, store, crypto, name=None):
  18. self.crypto = crypto
  19. self.store = store
  20. self.items = []
  21. self.chunks = []
  22. self.chunk_idx = {}
  23. self.hard_links = {}
  24. if name:
  25. self.load(self.crypto.id_hash(name))
  26. def load(self, id):
  27. self.id = id
  28. try:
  29. data, self.hash = self.crypto.decrypt(self.store.get(NS_ARCHIVE_METADATA, self.id))
  30. except self.store.DoesNotExist:
  31. raise self.DoesNotExist
  32. self.metadata = msgpack.unpackb(data)
  33. assert self.metadata['version'] == 1
  34. def get_items(self):
  35. data, chunks_hash = self.crypto.decrypt(self.store.get(NS_ARCHIVE_CHUNKS, self.id))
  36. chunks = msgpack.unpackb(data)
  37. assert chunks['version'] == 1
  38. assert self.metadata['chunks_hash'] == chunks_hash
  39. self.chunks = chunks['chunks']
  40. data, items_hash = self.crypto.decrypt(self.store.get(NS_ARCHIVE_ITEMS, self.id))
  41. items = msgpack.unpackb(data)
  42. assert items['version'] == 1
  43. assert self.metadata['items_hash'] == items_hash
  44. self.items = items['items']
  45. for i, chunk in enumerate(self.chunks):
  46. self.chunk_idx[i] = chunk[0]
  47. def save(self, name):
  48. self.id = self.crypto.id_hash(name)
  49. chunks = {'version': 1, 'chunks': self.chunks}
  50. data, chunks_hash = self.crypto.encrypt_create(msgpack.packb(chunks))
  51. self.store.put(NS_ARCHIVE_CHUNKS, self.id, data)
  52. items = {'version': 1, 'items': self.items}
  53. data, items_hash = self.crypto.encrypt_read(msgpack.packb(items))
  54. self.store.put(NS_ARCHIVE_ITEMS, self.id, data)
  55. metadata = {
  56. 'version': 1,
  57. 'name': name,
  58. 'chunks_hash': chunks_hash,
  59. 'items_hash': items_hash,
  60. 'cmdline': sys.argv,
  61. 'hostname': socket.gethostname(),
  62. 'username': getuser(),
  63. 'time': datetime.utcnow().isoformat(),
  64. }
  65. data, self.hash = self.crypto.encrypt_read(msgpack.packb(metadata))
  66. self.store.put(NS_ARCHIVE_METADATA, self.id, data)
  67. self.store.commit()
  68. def add_chunk(self, id, size):
  69. try:
  70. return self.chunk_idx[id]
  71. except KeyError:
  72. idx = len(self.chunks)
  73. self.chunks.append((id, size))
  74. self.chunk_idx[id] = idx
  75. return idx
  76. def stats(self, cache):
  77. self.get_items()
  78. osize = csize = usize = 0
  79. for item in self.items:
  80. if stat.S_ISREG(item['mode']) and not 'source' in item:
  81. osize += item['size']
  82. for id, size in self.chunks:
  83. csize += size
  84. if cache.seen_chunk(id) == 1:
  85. usize += size
  86. return osize, csize, usize
  87. def extract_item(self, item, dest=None):
  88. dest = dest or os.getcwdu()
  89. dir_stat_queue = []
  90. assert item['path'][0] not in ('/', '\\', ':')
  91. path = os.path.join(dest, item['path'].decode('utf-8'))
  92. mode = item['mode']
  93. if stat.S_ISDIR(mode):
  94. if not os.path.exists(path):
  95. os.makedirs(path)
  96. self.restore_attrs(path, item)
  97. elif stat.S_ISFIFO(mode):
  98. if not os.path.exists(os.path.dirname(path)):
  99. os.makedirs(os.path.dirname(path))
  100. os.mkfifo(path)
  101. self.restore_attrs(path, item)
  102. elif stat.S_ISLNK(mode):
  103. if not os.path.exists(os.path.dirname(path)):
  104. os.makedirs(os.path.dirname(path))
  105. source = item['source']
  106. if os.path.exists(path):
  107. os.unlink(path)
  108. os.symlink(source, path)
  109. self.restore_attrs(path, item, symlink=True)
  110. elif stat.S_ISREG(mode):
  111. if not os.path.exists(os.path.dirname(path)):
  112. os.makedirs(os.path.dirname(path))
  113. # Hard link?
  114. if 'source' in item:
  115. source = os.path.join(dest, item['source'])
  116. if os.path.exists(path):
  117. os.unlink(path)
  118. os.link(source, path)
  119. else:
  120. with open(path, 'wb') as fd:
  121. for chunk in item['chunks']:
  122. id = self.chunk_idx[chunk]
  123. try:
  124. data, hash = self.crypto.decrypt(self.store.get(NS_CHUNK, id))
  125. if self.crypto.id_hash(data) != id:
  126. raise IntegrityError('chunk id did not match')
  127. fd.write(data)
  128. except ValueError:
  129. raise Exception('Invalid chunk checksum')
  130. self.restore_attrs(path, item)
  131. else:
  132. raise Exception('Unknown archive item type %r' % item['mode'])
  133. def restore_attrs(self, path, item, symlink=False):
  134. xattrs = item.get('xattrs')
  135. if xattrs:
  136. try:
  137. xa = xattr(path, XATTR_NOFOLLOW)
  138. for k, v in xattrs.items():
  139. xa.set(k, v)
  140. except IOError:
  141. pass
  142. if have_lchmod:
  143. os.lchmod(path, item['mode'])
  144. elif not symlink:
  145. os.chmod(path, item['mode'])
  146. uid = user2uid(item['user']) or item['uid']
  147. gid = group2gid(item['group']) or item['gid']
  148. try:
  149. os.lchown(path, uid, gid)
  150. except OSError:
  151. pass
  152. if not symlink:
  153. # FIXME: We should really call futimes here (c extension required)
  154. os.utime(path, (item['atime'], item['mtime']))
  155. def verify_file(self, item):
  156. for chunk in item['chunks']:
  157. id = self.chunk_idx[chunk]
  158. try:
  159. data, hash = self.crypto.decrypt(self.store.get(NS_CHUNK, id))
  160. if self.crypto.id_hash(data) != id:
  161. raise IntegrityError('chunk id did not match')
  162. except IntegrityError:
  163. return False
  164. return True
  165. def delete(self, cache):
  166. self.get_items()
  167. self.store.delete(NS_ARCHIVE_CHUNKS, self.id)
  168. self.store.delete(NS_ARCHIVE_ITEMS, self.id)
  169. self.store.delete(NS_ARCHIVE_METADATA, self.id)
  170. for id, size in self.chunks:
  171. cache.chunk_decref(id)
  172. self.store.commit()
  173. cache.save()
  174. def stat_attrs(self, st, path):
  175. item = {
  176. 'mode': st.st_mode,
  177. 'uid': st.st_uid, 'user': uid2user(st.st_uid),
  178. 'gid': st.st_gid, 'group': gid2group(st.st_gid),
  179. 'atime': st.st_atime, 'mtime': st.st_mtime,
  180. }
  181. try:
  182. item['xattrs'] = dict(xattr(path, XATTR_NOFOLLOW))
  183. except IOError:
  184. pass
  185. return item
  186. def process_dir(self, path, st):
  187. item = {'path': path.lstrip('/\\:')}
  188. item.update(self.stat_attrs(st, path))
  189. self.items.append(item)
  190. def process_fifo(self, path, st):
  191. item = {'path': path.lstrip('/\\:')}
  192. item.update(self.stat_attrs(st, path))
  193. self.items.append(item)
  194. def process_symlink(self, path, st):
  195. source = os.readlink(path)
  196. item = {'path': path.lstrip('/\\:'), 'source': source}
  197. item.update(self.stat_attrs(st, path))
  198. self.items.append(item)
  199. def process_file(self, path, st, cache):
  200. safe_path = path.lstrip('/\\:')
  201. # Is it a hard link?
  202. if st.st_nlink > 1:
  203. source = self.hard_links.get((st.st_ino, st.st_dev))
  204. if (st.st_ino, st.st_dev) in self.hard_links:
  205. self.items.append({'path': path, 'source': source})
  206. return
  207. else:
  208. self.hard_links[st.st_ino, st.st_dev] = safe_path
  209. path_hash = self.crypto.id_hash(path.encode('utf-8'))
  210. ids, size = cache.file_known_and_unchanged(path_hash, st)
  211. if ids is not None:
  212. # Make sure all ids are available
  213. for id in ids:
  214. if not cache.seen_chunk(id):
  215. ids = None
  216. break
  217. else:
  218. chunks = [self.process_chunk2(id, cache) for id in ids]
  219. # Only chunkify the file if needed
  220. if ids is None:
  221. fd = open(path, 'rb')
  222. with open(path, 'rb') as fd:
  223. size = 0
  224. ids = []
  225. chunks = []
  226. for chunk in chunkify(fd, CHUNK_SIZE, 30):
  227. id = self.crypto.id_hash(chunk)
  228. ids.append(id)
  229. try:
  230. chunks.append(self.chunk_idx[id])
  231. except KeyError:
  232. chunks.append(self.process_chunk(id, chunk, cache))
  233. size += len(chunk)
  234. cache.memorize_file_chunks(path_hash, st, ids)
  235. item = {'path': safe_path, 'chunks': chunks, 'size': size}
  236. item.update(self.stat_attrs(st, path))
  237. self.items.append(item)
  238. def process_chunk2(self, id, cache):
  239. try:
  240. return self.chunk_idx[id]
  241. except KeyError:
  242. idx = len(self.chunks)
  243. size = cache.chunk_incref(id)
  244. self.chunks.append((id, size))
  245. self.chunk_idx[id] = idx
  246. return idx
  247. def process_chunk(self, id, data, cache):
  248. idx = len(self.chunks)
  249. size = cache.add_chunk(id, data)
  250. self.chunks.append((id, size))
  251. self.chunk_idx[id] = idx
  252. return idx
  253. @staticmethod
  254. def list_archives(store, crypto):
  255. for id in list(store.list(NS_ARCHIVE_METADATA)):
  256. archive = Archive(store, crypto)
  257. archive.load(id)
  258. yield archive