archive.py 10.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275
  1. from datetime import datetime
  2. from getpass import getuser
  3. import logging
  4. import msgpack
  5. import os
  6. import socket
  7. import stat
  8. import sys
  9. from .cache import NS_ARCHIVES, NS_CHUNKS, NS_CINDEX
  10. from .chunkifier import chunkify
  11. from .helpers import uid2user, user2uid, gid2group, group2gid, IntegrityError
  12. CHUNK_SIZE = 55001
  13. class Archive(object):
  14. def __init__(self, store, crypto, name=None):
  15. self.crypto = crypto
  16. self.store = store
  17. self.items = []
  18. self.chunks = []
  19. self.chunk_idx = {}
  20. self.hard_links = {}
  21. if name:
  22. self.load(self.crypto.id_hash(name))
  23. def load(self, id):
  24. self.id = id
  25. data, hash = self.crypto.decrypt(self.store.get(NS_ARCHIVES, self.id))
  26. archive = msgpack.unpackb(data)
  27. if archive['version'] != 1:
  28. raise Exception('Archive version %r not supported' % archive['version'])
  29. self.items = archive['items']
  30. self.name = archive['name']
  31. data, hash = self.crypto.decrypt(self.store.get(NS_CINDEX, self.id))
  32. cindex = msgpack.unpackb(data)
  33. assert cindex['version'] == 1
  34. if archive['cindex'] != hash:
  35. raise Exception('decryption failed')
  36. self.chunks = cindex['chunks']
  37. for i, chunk in enumerate(self.chunks):
  38. self.chunk_idx[i] = chunk[0]
  39. def save(self, name):
  40. self.id = self.crypto.id_hash(name)
  41. cindex = {
  42. 'version': 1,
  43. 'chunks': self.chunks,
  44. }
  45. data, cindex_hash = self.crypto.encrypt_create(msgpack.packb(cindex))
  46. self.store.put(NS_CINDEX, self.id, data)
  47. archive = {
  48. 'version': 1,
  49. 'name': name,
  50. 'cindex': cindex_hash,
  51. 'cmdline': sys.argv,
  52. 'hostname': socket.gethostname(),
  53. 'username': getuser(),
  54. 'ts': datetime.utcnow().isoformat(),
  55. 'items': self.items,
  56. }
  57. data, self.hash = self.crypto.encrypt_read(msgpack.packb(archive))
  58. self.store.put(NS_ARCHIVES, self.id, data)
  59. self.store.commit()
  60. def add_chunk(self, id, size):
  61. try:
  62. return self.chunk_idx[id]
  63. except KeyError:
  64. idx = len(self.chunks)
  65. self.chunks.append((id, size))
  66. self.chunk_idx[id] = idx
  67. return idx
  68. def stats(self, cache):
  69. osize = csize = usize = 0
  70. for item in self.items:
  71. if item['type'] == 'FILE':
  72. osize += item['size']
  73. for id, size in self.chunks:
  74. csize += size
  75. if cache.seen_chunk(id) == 1:
  76. usize += size
  77. return osize, csize, usize
  78. def list(self):
  79. for item in self.items:
  80. print item['path']
  81. def extract(self, dest=None):
  82. dest = dest or os.getcwdu()
  83. dir_stat_queue = []
  84. for item in self.items:
  85. assert item['path'][0] not in ('/', '\\', ':')
  86. path = os.path.join(dest, item['path'].decode('utf-8'))
  87. if item['type'] == 'DIRECTORY':
  88. logging.info(path)
  89. if not os.path.exists(path):
  90. os.makedirs(path)
  91. dir_stat_queue.append((path, item))
  92. continue
  93. elif item['type'] == 'SYMLINK':
  94. if not os.path.exists(os.path.dirname(path)):
  95. os.makedirs(os.path.dirname(path))
  96. source = item['source']
  97. logging.info('%s -> %s', path, source)
  98. if os.path.exists(path):
  99. os.unlink(path)
  100. os.symlink(source, path)
  101. self.restore_stat(path, item, call_utime=False)
  102. elif item['type'] == 'HARDLINK':
  103. if not os.path.exists(os.path.dirname(path)):
  104. os.makedirs(os.path.dirname(path))
  105. source = os.path.join(dest, item['source'])
  106. logging.info('%s => %s', path, source)
  107. if os.path.exists(path):
  108. os.unlink(path)
  109. os.link(source, path)
  110. elif item['type'] == 'FILE':
  111. logging.info(path)
  112. if not os.path.exists(os.path.dirname(path)):
  113. os.makedirs(os.path.dirname(path))
  114. with open(path, 'wb') as fd:
  115. for chunk in item['chunks']:
  116. id = self.chunk_idx[chunk]
  117. try:
  118. data, hash = self.crypto.decrypt(self.store.get(NS_CHUNKS, id))
  119. if self.crypto.id_hash(data) != id:
  120. raise IntegrityError('chunk id did not match')
  121. fd.write(data)
  122. except ValueError:
  123. raise Exception('Invalid chunk checksum')
  124. self.restore_stat(path, item)
  125. else:
  126. raise Exception('Unknown archive item type %r' % item['type'])
  127. if dir_stat_queue and not path.startswith(dir_stat_queue[-1][0]):
  128. self.restore_stat(*dir_stat_queue.pop())
  129. def restore_stat(self, path, item, call_utime=True):
  130. os.lchmod(path, item['mode'])
  131. uid = user2uid(item['user']) or item['uid']
  132. gid = group2gid(item['group']) or item['gid']
  133. try:
  134. os.lchown(path, uid, gid)
  135. except OSError:
  136. pass
  137. if call_utime:
  138. # FIXME: We should really call futimes here (c extension required)
  139. os.utime(path, (item['ctime'], item['mtime']))
  140. def verify(self):
  141. for item in self.items:
  142. if item['type'] == 'FILE':
  143. item['path'] = item['path'].decode('utf-8')
  144. for chunk in item['chunks']:
  145. id = self.chunk_idx[chunk]
  146. try:
  147. data, hash = self.crypto.decrypt(self.store.get(NS_CHUNKS, id))
  148. if self.crypto.id_hash(data) != id:
  149. raise IntegrityError('chunk id did not match')
  150. except IntegrityError:
  151. logging.error('%s ... ERROR', item['path'])
  152. break
  153. else:
  154. logging.info('%s ... OK', item['path'])
  155. def delete(self, cache):
  156. self.store.delete(NS_ARCHIVES, self.id)
  157. self.store.delete(NS_CINDEX, self.id)
  158. for id, size in self.chunks:
  159. cache.chunk_decref(id)
  160. self.store.commit()
  161. cache.save()
  162. def _walk(self, path):
  163. st = os.lstat(path)
  164. yield path, st
  165. if stat.S_ISDIR(st.st_mode):
  166. for f in os.listdir(path):
  167. for x in self._walk(os.path.join(path, f)):
  168. yield x
  169. def create(self, name, paths, cache):
  170. id = self.crypto.id_hash(name)
  171. try:
  172. self.store.get(NS_ARCHIVES, id)
  173. except self.store.DoesNotExist:
  174. pass
  175. else:
  176. raise NameError('Archive already exists')
  177. for path in paths:
  178. for path, st in self._walk(unicode(path)):
  179. if stat.S_ISDIR(st.st_mode):
  180. self.process_dir(path, st)
  181. elif stat.S_ISLNK(st.st_mode):
  182. self.process_symlink(path, st)
  183. elif stat.S_ISREG(st.st_mode):
  184. self.process_file(path, st, cache)
  185. else:
  186. logging.error('Unknown file type: %s', path)
  187. self.save(name)
  188. cache.save()
  189. def process_dir(self, path, st):
  190. path = path.lstrip('/\\:')
  191. logging.info(path)
  192. self.items.append({
  193. 'type': 'DIRECTORY', 'path': path,
  194. 'mode': st.st_mode,
  195. 'uid': st.st_uid, 'user': uid2user(st.st_uid),
  196. 'gid': st.st_gid, 'group': gid2group(st.st_gid),
  197. 'ctime': st.st_ctime, 'mtime': st.st_mtime,
  198. })
  199. def process_symlink(self, path, st):
  200. source = os.readlink(path)
  201. path = path.lstrip('/\\:')
  202. logging.info('%s -> %s', path, source)
  203. self.items.append({
  204. 'type': 'SYMLINK', 'path': path, 'source': source,
  205. 'mode': st.st_mode,
  206. 'uid': st.st_uid, 'user': uid2user(st.st_uid),
  207. 'gid': st.st_gid, 'group': gid2group(st.st_gid),
  208. 'ctime': st.st_ctime, 'mtime': st.st_mtime,
  209. })
  210. def process_file(self, path, st, cache):
  211. safe_path = path.lstrip('/\\:')
  212. if st.st_nlink > 1:
  213. source = self.hard_links.get((st.st_ino, st.st_dev))
  214. if (st.st_ino, st.st_dev) in self.hard_links:
  215. logging.info('%s => %s', path, source)
  216. self.items.append({ 'type': 'HARDLINK',
  217. 'path': path, 'source': source})
  218. return
  219. else:
  220. self.hard_links[st.st_ino, st.st_dev] = safe_path
  221. try:
  222. fd = open(path, 'rb')
  223. except IOError, e:
  224. logging.error(e)
  225. return
  226. with fd:
  227. logging.info(safe_path)
  228. chunks = []
  229. size = 0
  230. for chunk in chunkify(fd, CHUNK_SIZE, 30):
  231. chunks.append(self.process_chunk(chunk, cache))
  232. size += len(chunk)
  233. self.items.append({
  234. 'type': 'FILE', 'path': safe_path, 'chunks': chunks, 'size': size,
  235. 'mode': st.st_mode,
  236. 'uid': st.st_uid, 'user': uid2user(st.st_uid),
  237. 'gid': st.st_gid, 'group': gid2group(st.st_gid),
  238. 'ctime': st.st_ctime, 'mtime': st.st_mtime,
  239. })
  240. def process_chunk(self, data, cache):
  241. id = self.crypto.id_hash(data)
  242. try:
  243. return self.chunk_idx[id]
  244. except KeyError:
  245. idx = len(self.chunks)
  246. size = cache.add_chunk(id, data, self.crypto)
  247. self.chunks.append((id, size))
  248. self.chunk_idx[id] = idx
  249. return idx
  250. @staticmethod
  251. def list_archives(store, crypto):
  252. for id in store.list(NS_ARCHIVES):
  253. archive = Archive(store, crypto)
  254. archive.load(id)
  255. yield archive