archive.py 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268
  1. from datetime import datetime
  2. from getpass import getuser
  3. import msgpack
  4. import os
  5. import socket
  6. import stat
  7. import sys
  8. from . import NS_ARCHIVE_METADATA, NS_ARCHIVE_ITEMS, NS_ARCHIVE_CHUNKS, NS_CHUNK
  9. from .chunkifier import chunkify
  10. from .helpers import uid2user, user2uid, gid2group, group2gid, IntegrityError
  11. CHUNK_SIZE = 55001
  12. have_lchmod = hasattr(os, 'lchmod')
  13. class Archive(object):
  14. class DoesNotExist(Exception):
  15. pass
  16. def __init__(self, store, crypto, name=None):
  17. self.crypto = crypto
  18. self.store = store
  19. self.items = []
  20. self.chunks = []
  21. self.chunk_idx = {}
  22. self.hard_links = {}
  23. if name:
  24. self.load(self.crypto.id_hash(name))
  25. def load(self, id):
  26. self.id = id
  27. try:
  28. data, self.hash = self.crypto.decrypt(self.store.get(NS_ARCHIVE_METADATA, self.id))
  29. except self.store.DoesNotExist:
  30. raise self.DoesNotExist
  31. self.metadata = msgpack.unpackb(data)
  32. assert self.metadata['version'] == 1
  33. def get_items(self):
  34. data, chunks_hash = self.crypto.decrypt(self.store.get(NS_ARCHIVE_CHUNKS, self.id))
  35. chunks = msgpack.unpackb(data)
  36. assert chunks['version'] == 1
  37. assert self.metadata['chunks_hash'] == chunks_hash
  38. self.chunks = chunks['chunks']
  39. data, items_hash = self.crypto.decrypt(self.store.get(NS_ARCHIVE_ITEMS, self.id))
  40. items = msgpack.unpackb(data)
  41. assert items['version'] == 1
  42. assert self.metadata['items_hash'] == items_hash
  43. self.items = items['items']
  44. for i, chunk in enumerate(self.chunks):
  45. self.chunk_idx[i] = chunk[0]
  46. def save(self, name):
  47. self.id = self.crypto.id_hash(name)
  48. chunks = {'version': 1, 'chunks': self.chunks}
  49. data, chunks_hash = self.crypto.encrypt_create(msgpack.packb(chunks))
  50. self.store.put(NS_ARCHIVE_CHUNKS, self.id, data)
  51. items = {'version': 1, 'items': self.items}
  52. data, items_hash = self.crypto.encrypt_read(msgpack.packb(items))
  53. self.store.put(NS_ARCHIVE_ITEMS, self.id, data)
  54. metadata = {
  55. 'version': 1,
  56. 'name': name,
  57. 'chunks_hash': chunks_hash,
  58. 'items_hash': items_hash,
  59. 'cmdline': sys.argv,
  60. 'hostname': socket.gethostname(),
  61. 'username': getuser(),
  62. 'time': datetime.utcnow().isoformat(),
  63. }
  64. data, self.hash = self.crypto.encrypt_read(msgpack.packb(metadata))
  65. self.store.put(NS_ARCHIVE_METADATA, self.id, data)
  66. self.store.commit()
  67. def add_chunk(self, id, size):
  68. try:
  69. return self.chunk_idx[id]
  70. except KeyError:
  71. idx = len(self.chunks)
  72. self.chunks.append((id, size))
  73. self.chunk_idx[id] = idx
  74. return idx
  75. def stats(self, cache):
  76. self.get_items()
  77. osize = csize = usize = 0
  78. for item in self.items:
  79. if stat.S_ISREG(item['mode']) and not 'source' in item:
  80. osize += item['size']
  81. for id, size in self.chunks:
  82. csize += size
  83. if cache.seen_chunk(id) == 1:
  84. usize += size
  85. return osize, csize, usize
  86. def extract_item(self, item, dest=None):
  87. dest = dest or os.getcwdu()
  88. dir_stat_queue = []
  89. assert item['path'][0] not in ('/', '\\', ':')
  90. path = os.path.join(dest, item['path'].decode('utf-8'))
  91. mode = item['mode']
  92. if stat.S_ISDIR(mode):
  93. if not os.path.exists(path):
  94. os.makedirs(path)
  95. self.restore_attrs(path, item)
  96. elif stat.S_ISFIFO(mode):
  97. if not os.path.exists(os.path.dirname(path)):
  98. os.makedirs(os.path.dirname(path))
  99. os.mkfifo(path)
  100. self.restore_attrs(path, item)
  101. elif stat.S_ISLNK(mode):
  102. if not os.path.exists(os.path.dirname(path)):
  103. os.makedirs(os.path.dirname(path))
  104. source = item['source']
  105. if os.path.exists(path):
  106. os.unlink(path)
  107. os.symlink(source, path)
  108. self.restore_attrs(path, item, symlink=True)
  109. elif stat.S_ISREG(mode):
  110. if not os.path.exists(os.path.dirname(path)):
  111. os.makedirs(os.path.dirname(path))
  112. # Hard link?
  113. if 'source' in item:
  114. source = os.path.join(dest, item['source'])
  115. if os.path.exists(path):
  116. os.unlink(path)
  117. os.link(source, path)
  118. else:
  119. with open(path, 'wb') as fd:
  120. for chunk in item['chunks']:
  121. id = self.chunk_idx[chunk]
  122. try:
  123. data, hash = self.crypto.decrypt(self.store.get(NS_CHUNK, id))
  124. if self.crypto.id_hash(data) != id:
  125. raise IntegrityError('chunk id did not match')
  126. fd.write(data)
  127. except ValueError:
  128. raise Exception('Invalid chunk checksum')
  129. self.restore_attrs(path, item)
  130. else:
  131. raise Exception('Unknown archive item type %r' % item['mode'])
  132. def restore_attrs(self, path, item, symlink=False):
  133. if have_lchmod:
  134. os.lchmod(path, item['mode'])
  135. elif not symlink:
  136. os.chmod(path, item['mode'])
  137. uid = user2uid(item['user']) or item['uid']
  138. gid = group2gid(item['group']) or item['gid']
  139. try:
  140. os.lchown(path, uid, gid)
  141. except OSError:
  142. pass
  143. if not symlink:
  144. # FIXME: We should really call futimes here (c extension required)
  145. os.utime(path, (item['atime'], item['mtime']))
  146. def verify_file(self, item):
  147. for chunk in item['chunks']:
  148. id = self.chunk_idx[chunk]
  149. try:
  150. data, hash = self.crypto.decrypt(self.store.get(NS_CHUNK, id))
  151. if self.crypto.id_hash(data) != id:
  152. raise IntegrityError('chunk id did not match')
  153. except IntegrityError:
  154. return False
  155. return True
  156. def delete(self, cache):
  157. self.get_items()
  158. self.store.delete(NS_ARCHIVE_CHUNKS, self.id)
  159. self.store.delete(NS_ARCHIVE_ITEMS, self.id)
  160. self.store.delete(NS_ARCHIVE_METADATA, self.id)
  161. for id, size in self.chunks:
  162. cache.chunk_decref(id)
  163. self.store.commit()
  164. cache.save()
  165. def stat_attrs(self, st):
  166. return {
  167. 'mode': st.st_mode,
  168. 'uid': st.st_uid, 'user': uid2user(st.st_uid),
  169. 'gid': st.st_gid, 'group': gid2group(st.st_gid),
  170. 'atime': st.st_atime, 'mtime': st.st_mtime,
  171. }
  172. def process_dir(self, path, st):
  173. item = {'path': path.lstrip('/\\:')}
  174. item.update(self.stat_attrs(st))
  175. self.items.append(item)
  176. def process_fifo(self, path, st):
  177. item = {'path': path.lstrip('/\\:')}
  178. item.update(self.stat_attrs(st))
  179. self.items.append(item)
  180. def process_symlink(self, path, st):
  181. source = os.readlink(path)
  182. item = {'path': path.lstrip('/\\:'), 'source': source}
  183. item.update(self.stat_attrs(st))
  184. self.items.append(item)
  185. def process_file(self, path, st, cache):
  186. safe_path = path.lstrip('/\\:')
  187. # Is it a hard link?
  188. if st.st_nlink > 1:
  189. source = self.hard_links.get((st.st_ino, st.st_dev))
  190. if (st.st_ino, st.st_dev) in self.hard_links:
  191. self.items.append({'path': path, 'source': source})
  192. return
  193. else:
  194. self.hard_links[st.st_ino, st.st_dev] = safe_path
  195. path_hash = self.crypto.id_hash(path.encode('utf-8'))
  196. ids, size = cache.file_known_and_unchanged(path_hash, st)
  197. if ids is not None:
  198. # Make sure all ids are available
  199. for id in ids:
  200. if not cache.seen_chunk(id):
  201. ids = None
  202. break
  203. else:
  204. chunks = [self.process_chunk2(id, cache) for id in ids]
  205. # Only chunkify the file if needed
  206. if ids is None:
  207. fd = open(path, 'rb')
  208. with open(path, 'rb') as fd:
  209. size = 0
  210. ids = []
  211. chunks = []
  212. for chunk in chunkify(fd, CHUNK_SIZE, 30):
  213. id = self.crypto.id_hash(chunk)
  214. ids.append(id)
  215. try:
  216. chunks.append(self.chunk_idx[id])
  217. except KeyError:
  218. chunks.append(self.process_chunk(id, chunk, cache))
  219. size += len(chunk)
  220. cache.memorize_file_chunks(path_hash, st, ids)
  221. item = {'path': safe_path, 'chunks': chunks, 'size': size}
  222. item.update(self.stat_attrs(st))
  223. self.items.append(item)
  224. def process_chunk2(self, id, cache):
  225. try:
  226. return self.chunk_idx[id]
  227. except KeyError:
  228. idx = len(self.chunks)
  229. size = cache.chunk_incref(id)
  230. self.chunks.append((id, size))
  231. self.chunk_idx[id] = idx
  232. return idx
  233. def process_chunk(self, id, data, cache):
  234. idx = len(self.chunks)
  235. size = cache.add_chunk(id, data)
  236. self.chunks.append((id, size))
  237. self.chunk_idx[id] = idx
  238. return idx
  239. @staticmethod
  240. def list_archives(store, crypto):
  241. for id in list(store.list(NS_ARCHIVE_METADATA)):
  242. archive = Archive(store, crypto)
  243. archive.load(id)
  244. yield archive