archive.py 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272
  1. from datetime import datetime
  2. import logging
  3. import msgpack
  4. import os
  5. import socket
  6. import stat
  7. import sys
  8. from .cache import NS_ARCHIVES, NS_CHUNKS, NS_CINDEX
  9. from .chunkifier import chunkify
  10. from .helpers import uid2user, user2uid, gid2group, group2gid, IntegrityError
  11. CHUNK_SIZE = 55001
  12. class Archive(object):
  13. def __init__(self, store, crypto, name=None):
  14. self.crypto = crypto
  15. self.store = store
  16. self.items = []
  17. self.chunks = []
  18. self.chunk_idx = {}
  19. self.hard_links = {}
  20. if name:
  21. self.load(self.crypto.id_hash(name))
  22. def load(self, id):
  23. self.id = id
  24. data, hash = self.crypto.decrypt(self.store.get(NS_ARCHIVES, self.id))
  25. archive = msgpack.unpackb(data)
  26. if archive['version'] != 1:
  27. raise Exception('Archive version %r not supported' % archive['version'])
  28. self.items = archive['items']
  29. self.name = archive['name']
  30. data, hash = self.crypto.decrypt(self.store.get(NS_CINDEX, self.id))
  31. cindex = msgpack.unpackb(data)
  32. assert cindex['version'] == 1
  33. if archive['cindex'] != hash:
  34. raise Exception('decryption failed')
  35. self.chunks = cindex['chunks']
  36. for i, chunk in enumerate(self.chunks):
  37. self.chunk_idx[i] = chunk[0]
  38. def save(self, name):
  39. self.id = self.crypto.id_hash(name)
  40. cindex = {
  41. 'version': 1,
  42. 'chunks': self.chunks,
  43. }
  44. data, cindex_hash = self.crypto.encrypt_create(msgpack.packb(cindex))
  45. self.store.put(NS_CINDEX, self.id, data)
  46. archive = {
  47. 'version': 1,
  48. 'name': name,
  49. 'cindex': cindex_hash,
  50. 'cmdline': sys.argv,
  51. 'hostname': socket.gethostname(),
  52. 'ts': datetime.utcnow().isoformat(),
  53. 'items': self.items,
  54. }
  55. data, hash = self.crypto.encrypt_read(msgpack.packb(archive))
  56. self.store.put(NS_ARCHIVES, self.id, data)
  57. self.store.commit()
  58. def add_chunk(self, id, size):
  59. try:
  60. return self.chunk_idx[id]
  61. except KeyError:
  62. idx = len(self.chunks)
  63. self.chunks.append((id, size))
  64. self.chunk_idx[id] = idx
  65. return idx
  66. def stats(self, cache):
  67. osize = csize = usize = 0
  68. for item in self.items:
  69. if item['type'] == 'FILE':
  70. osize += item['size']
  71. for id, size in self.chunks:
  72. csize += size
  73. if cache.seen_chunk(id) == 1:
  74. usize += size
  75. return osize, csize, usize
  76. def list(self):
  77. for item in self.items:
  78. print item['path']
  79. def extract(self, dest=None):
  80. dest = dest or os.getcwdu()
  81. dir_stat_queue = []
  82. for item in self.items:
  83. assert item['path'][0] not in ('/', '\\', ':')
  84. path = os.path.join(dest, item['path'].decode('utf-8'))
  85. if item['type'] == 'DIRECTORY':
  86. logging.info(path)
  87. if not os.path.exists(path):
  88. os.makedirs(path)
  89. dir_stat_queue.append((path, item))
  90. continue
  91. elif item['type'] == 'SYMLINK':
  92. if not os.path.exists(os.path.dirname(path)):
  93. os.makedirs(os.path.dirname(path))
  94. source = item['source']
  95. logging.info('%s -> %s', path, source)
  96. if os.path.exists(path):
  97. os.unlink(path)
  98. os.symlink(source, path)
  99. self.restore_stat(path, item, call_utime=False)
  100. elif item['type'] == 'HARDLINK':
  101. if not os.path.exists(os.path.dirname(path)):
  102. os.makedirs(os.path.dirname(path))
  103. source = os.path.join(dest, item['source'])
  104. logging.info('%s => %s', path, source)
  105. if os.path.exists(path):
  106. os.unlink(path)
  107. os.link(source, path)
  108. elif item['type'] == 'FILE':
  109. logging.info(path)
  110. if not os.path.exists(os.path.dirname(path)):
  111. os.makedirs(os.path.dirname(path))
  112. with open(path, 'wb') as fd:
  113. for chunk in item['chunks']:
  114. id = self.chunk_idx[chunk]
  115. try:
  116. data, hash = self.crypto.decrypt(self.store.get(NS_CHUNKS, id))
  117. if self.crypto.id_hash(data) != id:
  118. raise IntegrityError('chunk id did not match')
  119. fd.write(data)
  120. except ValueError:
  121. raise Exception('Invalid chunk checksum')
  122. self.restore_stat(path, item)
  123. else:
  124. raise Exception('Unknown archive item type %r' % item['type'])
  125. if dir_stat_queue and not path.startswith(dir_stat_queue[-1][0]):
  126. self.restore_stat(*dir_stat_queue.pop())
  127. def restore_stat(self, path, item, call_utime=True):
  128. os.lchmod(path, item['mode'])
  129. uid = user2uid(item['user']) or item['uid']
  130. gid = group2gid(item['group']) or item['gid']
  131. try:
  132. os.lchown(path, uid, gid)
  133. except OSError:
  134. pass
  135. if call_utime:
  136. # FIXME: We should really call futimes here (c extension required)
  137. os.utime(path, (item['ctime'], item['mtime']))
  138. def verify(self):
  139. for item in self.items:
  140. if item['type'] == 'FILE':
  141. item['path'] = item['path'].decode('utf-8')
  142. for chunk in item['chunks']:
  143. id = self.chunk_idx[chunk]
  144. try:
  145. data, hash = self.crypto.decrypt(self.store.get(NS_CHUNKS, id))
  146. if self.crypto.id_hash(data) != id:
  147. raise IntegrityError('chunk id did not match')
  148. except IntegrityError:
  149. logging.error('%s ... ERROR', item['path'])
  150. break
  151. else:
  152. logging.info('%s ... OK', item['path'])
  153. def delete(self, cache):
  154. self.store.delete(NS_ARCHIVES, self.id)
  155. self.store.delete(NS_CINDEX, self.id)
  156. for id, size in self.chunks:
  157. cache.chunk_decref(id)
  158. self.store.commit()
  159. cache.save()
  160. def _walk(self, path):
  161. st = os.lstat(path)
  162. yield path, st
  163. if stat.S_ISDIR(st.st_mode):
  164. for f in os.listdir(path):
  165. for x in self._walk(os.path.join(path, f)):
  166. yield x
  167. def create(self, name, paths, cache):
  168. try:
  169. self.store.get(NS_ARCHIVES, name)
  170. except self.store.DoesNotExist:
  171. pass
  172. else:
  173. raise NameError('Archive already exists')
  174. for path in paths:
  175. for path, st in self._walk(unicode(path)):
  176. if stat.S_ISDIR(st.st_mode):
  177. self.process_dir(path, st)
  178. elif stat.S_ISLNK(st.st_mode):
  179. self.process_symlink(path, st)
  180. elif stat.S_ISREG(st.st_mode):
  181. self.process_file(path, st, cache)
  182. else:
  183. logging.error('Unknown file type: %s', path)
  184. self.save(name)
  185. cache.save()
  186. def process_dir(self, path, st):
  187. path = path.lstrip('/\\:')
  188. logging.info(path)
  189. self.items.append({
  190. 'type': 'DIRECTORY', 'path': path,
  191. 'mode': st.st_mode,
  192. 'uid': st.st_uid, 'user': uid2user(st.st_uid),
  193. 'gid': st.st_gid, 'group': gid2group(st.st_gid),
  194. 'ctime': st.st_ctime, 'mtime': st.st_mtime,
  195. })
  196. def process_symlink(self, path, st):
  197. source = os.readlink(path)
  198. path = path.lstrip('/\\:')
  199. logging.info('%s -> %s', path, source)
  200. self.items.append({
  201. 'type': 'SYMLINK', 'path': path, 'source': source,
  202. 'mode': st.st_mode,
  203. 'uid': st.st_uid, 'user': uid2user(st.st_uid),
  204. 'gid': st.st_gid, 'group': gid2group(st.st_gid),
  205. 'ctime': st.st_ctime, 'mtime': st.st_mtime,
  206. })
  207. def process_file(self, path, st, cache):
  208. safe_path = path.lstrip('/\\:')
  209. if st.st_nlink > 1:
  210. source = self.hard_links.get((st.st_ino, st.st_dev))
  211. if (st.st_ino, st.st_dev) in self.hard_links:
  212. logging.info('%s => %s', path, source)
  213. self.items.append({ 'type': 'HARDLINK',
  214. 'path': path, 'source': source})
  215. return
  216. else:
  217. self.hard_links[st.st_ino, st.st_dev] = safe_path
  218. try:
  219. fd = open(path, 'rb')
  220. except IOError, e:
  221. logging.error(e)
  222. return
  223. with fd:
  224. logging.info(safe_path)
  225. chunks = []
  226. size = 0
  227. for chunk in chunkify(fd, CHUNK_SIZE, 30):
  228. chunks.append(self.process_chunk(chunk, cache))
  229. size += len(chunk)
  230. self.items.append({
  231. 'type': 'FILE', 'path': safe_path, 'chunks': chunks, 'size': size,
  232. 'mode': st.st_mode,
  233. 'uid': st.st_uid, 'user': uid2user(st.st_uid),
  234. 'gid': st.st_gid, 'group': gid2group(st.st_gid),
  235. 'ctime': st.st_ctime, 'mtime': st.st_mtime,
  236. })
  237. def process_chunk(self, data, cache):
  238. id = self.crypto.id_hash(data)
  239. try:
  240. return self.chunk_idx[id]
  241. except KeyError:
  242. idx = len(self.chunks)
  243. size = cache.add_chunk(id, data, self.crypto)
  244. self.chunks.append((id, size))
  245. self.chunk_idx[id] = idx
  246. return idx
  247. @staticmethod
  248. def list_archives(store, crypto):
  249. for id in store.list(NS_ARCHIVES):
  250. archive = Archive(store, crypto)
  251. archive.load(id)
  252. yield archive