archive.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278
  1. from datetime import datetime
  2. from getpass import getuser
  3. import logging
  4. import msgpack
  5. import os
  6. import socket
  7. import stat
  8. import sys
  9. from .cache import NS_ARCHIVES, NS_CHUNKS, NS_CINDEX
  10. from .chunkifier import chunkify
  11. from .helpers import uid2user, user2uid, gid2group, group2gid, IntegrityError
  12. CHUNK_SIZE = 55001
  13. class Archive(object):
  14. def __init__(self, store, crypto, name=None):
  15. self.crypto = crypto
  16. self.store = store
  17. self.items = []
  18. self.chunks = []
  19. self.chunk_idx = {}
  20. self.hard_links = {}
  21. if name:
  22. self.load(self.crypto.id_hash(name))
  23. def load(self, id):
  24. self.id = id
  25. data, hash = self.crypto.decrypt(self.store.get(NS_ARCHIVES, self.id))
  26. archive = msgpack.unpackb(data)
  27. if archive['version'] != 1:
  28. raise Exception('Archive version %r not supported' % archive['version'])
  29. self.items = archive['items']
  30. self.name = archive['name']
  31. data, hash = self.crypto.decrypt(self.store.get(NS_CINDEX, self.id))
  32. cindex = msgpack.unpackb(data)
  33. assert cindex['version'] == 1
  34. if archive['cindex'] != hash:
  35. raise Exception('decryption failed')
  36. self.chunks = cindex['chunks']
  37. for i, chunk in enumerate(self.chunks):
  38. self.chunk_idx[i] = chunk[0]
  39. def save(self, name):
  40. self.id = self.crypto.id_hash(name)
  41. cindex = {
  42. 'version': 1,
  43. 'chunks': self.chunks,
  44. }
  45. data, cindex_hash = self.crypto.encrypt_create(msgpack.packb(cindex))
  46. self.store.put(NS_CINDEX, self.id, data)
  47. archive = {
  48. 'version': 1,
  49. 'name': name,
  50. 'cindex': cindex_hash,
  51. 'cmdline': sys.argv,
  52. 'hostname': socket.gethostname(),
  53. 'username': getuser(),
  54. 'ts': datetime.utcnow().isoformat(),
  55. 'items': self.items,
  56. }
  57. data, self.hash = self.crypto.encrypt_read(msgpack.packb(archive))
  58. self.store.put(NS_ARCHIVES, self.id, data)
  59. self.store.commit()
  60. def add_chunk(self, id, size):
  61. try:
  62. return self.chunk_idx[id]
  63. except KeyError:
  64. idx = len(self.chunks)
  65. self.chunks.append((id, size))
  66. self.chunk_idx[id] = idx
  67. return idx
  68. def stats(self, cache):
  69. osize = csize = usize = 0
  70. for item in self.items:
  71. if item['type'] == 'FILE':
  72. osize += item['size']
  73. for id, size in self.chunks:
  74. csize += size
  75. if cache.seen_chunk(id) == 1:
  76. usize += size
  77. return osize, csize, usize
  78. def list(self):
  79. for item in self.items:
  80. print item['path']
  81. def extract(self, dest=None):
  82. dest = dest or os.getcwdu()
  83. dir_stat_queue = []
  84. for item in self.items:
  85. assert item['path'][0] not in ('/', '\\', ':')
  86. path = os.path.join(dest, item['path'].decode('utf-8'))
  87. if item['type'] == 'DIRECTORY':
  88. logging.info(path)
  89. if not os.path.exists(path):
  90. os.makedirs(path)
  91. dir_stat_queue.append((path, item))
  92. continue
  93. elif item['type'] == 'SYMLINK':
  94. if not os.path.exists(os.path.dirname(path)):
  95. os.makedirs(os.path.dirname(path))
  96. source = item['source']
  97. logging.info('%s -> %s', path, source)
  98. if os.path.exists(path):
  99. os.unlink(path)
  100. os.symlink(source, path)
  101. self.restore_stat(path, item, symlink=True)
  102. elif item['type'] == 'HARDLINK':
  103. if not os.path.exists(os.path.dirname(path)):
  104. os.makedirs(os.path.dirname(path))
  105. source = os.path.join(dest, item['source'])
  106. logging.info('%s => %s', path, source)
  107. if os.path.exists(path):
  108. os.unlink(path)
  109. os.link(source, path)
  110. elif item['type'] == 'FILE':
  111. logging.info(path)
  112. if not os.path.exists(os.path.dirname(path)):
  113. os.makedirs(os.path.dirname(path))
  114. with open(path, 'wb') as fd:
  115. for chunk in item['chunks']:
  116. id = self.chunk_idx[chunk]
  117. try:
  118. data, hash = self.crypto.decrypt(self.store.get(NS_CHUNKS, id))
  119. if self.crypto.id_hash(data) != id:
  120. raise IntegrityError('chunk id did not match')
  121. fd.write(data)
  122. except ValueError:
  123. raise Exception('Invalid chunk checksum')
  124. self.restore_stat(path, item)
  125. else:
  126. raise Exception('Unknown archive item type %r' % item['type'])
  127. if dir_stat_queue and not path.startswith(dir_stat_queue[-1][0]):
  128. self.restore_stat(*dir_stat_queue.pop())
  129. def restore_stat(self, path, item, symlink=False):
  130. os.lchmod(path, item['mode'])
  131. uid = user2uid(item['user']) or item['uid']
  132. gid = group2gid(item['group']) or item['gid']
  133. try:
  134. if hasattr(os, 'lchown'): # Not available on Linux
  135. os.lchown(path, uid, gid)
  136. elif not symlink:
  137. os.chown(path, uid, gid)
  138. except OSError:
  139. pass
  140. if not symlink:
  141. # FIXME: We should really call futimes here (c extension required)
  142. os.utime(path, (item['ctime'], item['mtime']))
  143. def verify(self):
  144. for item in self.items:
  145. if item['type'] == 'FILE':
  146. item['path'] = item['path'].decode('utf-8')
  147. for chunk in item['chunks']:
  148. id = self.chunk_idx[chunk]
  149. try:
  150. data, hash = self.crypto.decrypt(self.store.get(NS_CHUNKS, id))
  151. if self.crypto.id_hash(data) != id:
  152. raise IntegrityError('chunk id did not match')
  153. except IntegrityError:
  154. logging.error('%s ... ERROR', item['path'])
  155. break
  156. else:
  157. logging.info('%s ... OK', item['path'])
  158. def delete(self, cache):
  159. self.store.delete(NS_ARCHIVES, self.id)
  160. self.store.delete(NS_CINDEX, self.id)
  161. for id, size in self.chunks:
  162. cache.chunk_decref(id)
  163. self.store.commit()
  164. cache.save()
  165. def _walk(self, path):
  166. st = os.lstat(path)
  167. yield path, st
  168. if stat.S_ISDIR(st.st_mode):
  169. for f in os.listdir(path):
  170. for x in self._walk(os.path.join(path, f)):
  171. yield x
  172. def create(self, name, paths, cache):
  173. id = self.crypto.id_hash(name)
  174. try:
  175. self.store.get(NS_ARCHIVES, id)
  176. except self.store.DoesNotExist:
  177. pass
  178. else:
  179. raise NameError('Archive already exists')
  180. for path in paths:
  181. for path, st in self._walk(unicode(path)):
  182. if stat.S_ISDIR(st.st_mode):
  183. self.process_dir(path, st)
  184. elif stat.S_ISLNK(st.st_mode):
  185. self.process_symlink(path, st)
  186. elif stat.S_ISREG(st.st_mode):
  187. self.process_file(path, st, cache)
  188. else:
  189. logging.error('Unknown file type: %s', path)
  190. self.save(name)
  191. cache.save()
  192. def process_dir(self, path, st):
  193. path = path.lstrip('/\\:')
  194. logging.info(path)
  195. self.items.append({
  196. 'type': 'DIRECTORY', 'path': path,
  197. 'mode': st.st_mode,
  198. 'uid': st.st_uid, 'user': uid2user(st.st_uid),
  199. 'gid': st.st_gid, 'group': gid2group(st.st_gid),
  200. 'ctime': st.st_ctime, 'mtime': st.st_mtime,
  201. })
  202. def process_symlink(self, path, st):
  203. source = os.readlink(path)
  204. path = path.lstrip('/\\:')
  205. logging.info('%s -> %s', path, source)
  206. self.items.append({
  207. 'type': 'SYMLINK', 'path': path, 'source': source,
  208. 'mode': st.st_mode,
  209. 'uid': st.st_uid, 'user': uid2user(st.st_uid),
  210. 'gid': st.st_gid, 'group': gid2group(st.st_gid),
  211. 'ctime': st.st_ctime, 'mtime': st.st_mtime,
  212. })
  213. def process_file(self, path, st, cache):
  214. safe_path = path.lstrip('/\\:')
  215. if st.st_nlink > 1:
  216. source = self.hard_links.get((st.st_ino, st.st_dev))
  217. if (st.st_ino, st.st_dev) in self.hard_links:
  218. logging.info('%s => %s', path, source)
  219. self.items.append({ 'type': 'HARDLINK',
  220. 'path': path, 'source': source})
  221. return
  222. else:
  223. self.hard_links[st.st_ino, st.st_dev] = safe_path
  224. try:
  225. fd = open(path, 'rb')
  226. except IOError, e:
  227. logging.error(e)
  228. return
  229. with fd:
  230. logging.info(safe_path)
  231. chunks = []
  232. size = 0
  233. for chunk in chunkify(fd, CHUNK_SIZE, 30):
  234. chunks.append(self.process_chunk(chunk, cache))
  235. size += len(chunk)
  236. self.items.append({
  237. 'type': 'FILE', 'path': safe_path, 'chunks': chunks, 'size': size,
  238. 'mode': st.st_mode,
  239. 'uid': st.st_uid, 'user': uid2user(st.st_uid),
  240. 'gid': st.st_gid, 'group': gid2group(st.st_gid),
  241. 'ctime': st.st_ctime, 'mtime': st.st_mtime,
  242. })
  243. def process_chunk(self, data, cache):
  244. id = self.crypto.id_hash(data)
  245. try:
  246. return self.chunk_idx[id]
  247. except KeyError:
  248. idx = len(self.chunks)
  249. size = cache.add_chunk(id, data, self.crypto)
  250. self.chunks.append((id, size))
  251. self.chunk_idx[id] = idx
  252. return idx
  253. @staticmethod
  254. def list_archives(store, crypto):
  255. for id in store.list(NS_ARCHIVES):
  256. archive = Archive(store, crypto)
  257. archive.load(id)
  258. yield archive