archive.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287
  1. from datetime import datetime
  2. from getpass import getuser
  3. import logging
  4. import msgpack
  5. import os
  6. import socket
  7. import stat
  8. import sys
  9. from . import NS_ARCHIVE_METADATA, NS_ARCHIVE_ITEMS, NS_ARCHIVE_CHUNKS, NS_CHUNK
  10. from .chunkifier import chunkify
  11. from .helpers import uid2user, user2uid, gid2group, group2gid, IntegrityError
  12. CHUNK_SIZE = 55001
  13. class Archive(object):
  14. def __init__(self, store, crypto, name=None):
  15. self.crypto = crypto
  16. self.store = store
  17. self.items = []
  18. self.chunks = []
  19. self.chunk_idx = {}
  20. self.hard_links = {}
  21. if name:
  22. self.load(self.crypto.id_hash(name))
  23. def load(self, id):
  24. self.id = id
  25. data, self.hash = self.crypto.decrypt(self.store.get(NS_ARCHIVE_METADATA, self.id))
  26. self.metadata = msgpack.unpackb(data)
  27. assert self.metadata['version'] == 1
  28. def get_items(self):
  29. data, chunks_hash = self.crypto.decrypt(self.store.get(NS_ARCHIVE_CHUNKS, self.id))
  30. chunks = msgpack.unpackb(data)
  31. assert chunks['version'] == 1
  32. assert self.metadata['chunks_hash'] == chunks_hash
  33. self.chunks = chunks['chunks']
  34. data, items_hash = self.crypto.decrypt(self.store.get(NS_ARCHIVE_ITEMS, self.id))
  35. items = msgpack.unpackb(data)
  36. assert items['version'] == 1
  37. assert self.metadata['items_hash'] == items_hash
  38. self.items = items['items']
  39. for i, chunk in enumerate(self.chunks):
  40. self.chunk_idx[i] = chunk[0]
  41. def save(self, name):
  42. self.id = self.crypto.id_hash(name)
  43. chunks = {'version': 1, 'chunks': self.chunks}
  44. data, chunks_hash = self.crypto.encrypt_create(msgpack.packb(chunks))
  45. self.store.put(NS_ARCHIVE_CHUNKS, self.id, data)
  46. items = {'version': 1, 'items': self.items}
  47. data, items_hash = self.crypto.encrypt_read(msgpack.packb(items))
  48. self.store.put(NS_ARCHIVE_ITEMS, self.id, data)
  49. metadata = {
  50. 'version': 1,
  51. 'name': name,
  52. 'chunks_hash': chunks_hash,
  53. 'items_hash': items_hash,
  54. 'cmdline': sys.argv,
  55. 'hostname': socket.gethostname(),
  56. 'username': getuser(),
  57. 'time': datetime.utcnow().isoformat(),
  58. }
  59. data, self.hash = self.crypto.encrypt_read(msgpack.packb(metadata))
  60. self.store.put(NS_ARCHIVE_METADATA, self.id, data)
  61. self.store.commit()
  62. def add_chunk(self, id, size):
  63. try:
  64. return self.chunk_idx[id]
  65. except KeyError:
  66. idx = len(self.chunks)
  67. self.chunks.append((id, size))
  68. self.chunk_idx[id] = idx
  69. return idx
  70. def stats(self, cache):
  71. self.get_items()
  72. osize = csize = usize = 0
  73. for item in self.items:
  74. if item['type'] == 'FILE':
  75. osize += item['size']
  76. for id, size in self.chunks:
  77. csize += size
  78. if cache.seen_chunk(id) == 1:
  79. usize += size
  80. return osize, csize, usize
  81. def list(self):
  82. self.get_items()
  83. for item in self.items:
  84. print item['path']
  85. def extract(self, dest=None):
  86. self.get_items()
  87. dest = dest or os.getcwdu()
  88. dir_stat_queue = []
  89. for item in self.items:
  90. assert item['path'][0] not in ('/', '\\', ':')
  91. path = os.path.join(dest, item['path'].decode('utf-8'))
  92. if item['type'] == 'DIRECTORY':
  93. logging.info(path)
  94. if not os.path.exists(path):
  95. os.makedirs(path)
  96. dir_stat_queue.append((path, item))
  97. continue
  98. elif item['type'] == 'SYMLINK':
  99. if not os.path.exists(os.path.dirname(path)):
  100. os.makedirs(os.path.dirname(path))
  101. source = item['source']
  102. logging.info('%s -> %s', path, source)
  103. if os.path.exists(path):
  104. os.unlink(path)
  105. os.symlink(source, path)
  106. self.restore_stat(path, item, symlink=True)
  107. elif item['type'] == 'HARDLINK':
  108. if not os.path.exists(os.path.dirname(path)):
  109. os.makedirs(os.path.dirname(path))
  110. source = os.path.join(dest, item['source'])
  111. logging.info('%s => %s', path, source)
  112. if os.path.exists(path):
  113. os.unlink(path)
  114. os.link(source, path)
  115. elif item['type'] == 'FILE':
  116. logging.info(path)
  117. if not os.path.exists(os.path.dirname(path)):
  118. os.makedirs(os.path.dirname(path))
  119. with open(path, 'wb') as fd:
  120. for chunk in item['chunks']:
  121. id = self.chunk_idx[chunk]
  122. try:
  123. data, hash = self.crypto.decrypt(self.store.get(NS_CHUNK, id))
  124. if self.crypto.id_hash(data) != id:
  125. raise IntegrityError('chunk id did not match')
  126. fd.write(data)
  127. except ValueError:
  128. raise Exception('Invalid chunk checksum')
  129. self.restore_stat(path, item)
  130. else:
  131. raise Exception('Unknown archive item type %r' % item['type'])
  132. if dir_stat_queue and not path.startswith(dir_stat_queue[-1][0]):
  133. self.restore_stat(*dir_stat_queue.pop())
  134. def restore_stat(self, path, item, symlink=False):
  135. os.lchmod(path, item['mode'])
  136. uid = user2uid(item['user']) or item['uid']
  137. gid = group2gid(item['group']) or item['gid']
  138. try:
  139. if hasattr(os, 'lchown'): # Not available on Linux
  140. os.lchown(path, uid, gid)
  141. elif not symlink:
  142. os.chown(path, uid, gid)
  143. except OSError:
  144. pass
  145. if not symlink:
  146. # FIXME: We should really call futimes here (c extension required)
  147. os.utime(path, (item['ctime'], item['mtime']))
  148. def verify(self):
  149. self.get_items()
  150. for item in self.items:
  151. if item['type'] == 'FILE':
  152. item['path'] = item['path'].decode('utf-8')
  153. for chunk in item['chunks']:
  154. id = self.chunk_idx[chunk]
  155. try:
  156. data, hash = self.crypto.decrypt(self.store.get(NS_CHUNK, id))
  157. if self.crypto.id_hash(data) != id:
  158. raise IntegrityError('chunk id did not match')
  159. except IntegrityError:
  160. logging.error('%s ... ERROR', item['path'])
  161. break
  162. else:
  163. logging.info('%s ... OK', item['path'])
  164. def delete(self, cache):
  165. self.get_items()
  166. self.store.delete(NS_ARCHIVE_CHUNKS, self.id)
  167. self.store.delete(NS_ARCHIVE_ITEMS, self.id)
  168. self.store.delete(NS_ARCHIVE_METADATA, self.id)
  169. for id, size in self.chunks:
  170. cache.chunk_decref(id)
  171. self.store.commit()
  172. cache.save()
  173. def _walk(self, path):
  174. st = os.lstat(path)
  175. yield path, st
  176. if stat.S_ISDIR(st.st_mode):
  177. for f in os.listdir(path):
  178. for x in self._walk(os.path.join(path, f)):
  179. yield x
  180. def create(self, name, paths, cache):
  181. id = self.crypto.id_hash(name)
  182. try:
  183. self.store.get(NS_ARCHIVE_METADATA, id)
  184. except self.store.DoesNotExist:
  185. pass
  186. else:
  187. raise NameError('Archive already exists')
  188. for path in paths:
  189. for path, st in self._walk(unicode(path)):
  190. if stat.S_ISDIR(st.st_mode):
  191. self.process_dir(path, st)
  192. elif stat.S_ISLNK(st.st_mode):
  193. self.process_symlink(path, st)
  194. elif stat.S_ISREG(st.st_mode):
  195. self.process_file(path, st, cache)
  196. else:
  197. logging.error('Unknown file type: %s', path)
  198. self.save(name)
  199. cache.save()
  200. def process_dir(self, path, st):
  201. path = path.lstrip('/\\:')
  202. logging.info(path)
  203. self.items.append({
  204. 'type': 'DIRECTORY', 'path': path,
  205. 'mode': st.st_mode,
  206. 'uid': st.st_uid, 'user': uid2user(st.st_uid),
  207. 'gid': st.st_gid, 'group': gid2group(st.st_gid),
  208. 'ctime': st.st_ctime, 'mtime': st.st_mtime,
  209. })
  210. def process_symlink(self, path, st):
  211. source = os.readlink(path)
  212. path = path.lstrip('/\\:')
  213. logging.info('%s -> %s', path, source)
  214. self.items.append({
  215. 'type': 'SYMLINK', 'path': path, 'source': source,
  216. 'mode': st.st_mode,
  217. 'uid': st.st_uid, 'user': uid2user(st.st_uid),
  218. 'gid': st.st_gid, 'group': gid2group(st.st_gid),
  219. 'ctime': st.st_ctime, 'mtime': st.st_mtime,
  220. })
  221. def process_file(self, path, st, cache):
  222. safe_path = path.lstrip('/\\:')
  223. if st.st_nlink > 1:
  224. source = self.hard_links.get((st.st_ino, st.st_dev))
  225. if (st.st_ino, st.st_dev) in self.hard_links:
  226. logging.info('%s => %s', path, source)
  227. self.items.append({ 'type': 'HARDLINK',
  228. 'path': path, 'source': source})
  229. return
  230. else:
  231. self.hard_links[st.st_ino, st.st_dev] = safe_path
  232. try:
  233. fd = open(path, 'rb')
  234. except IOError, e:
  235. logging.error(e)
  236. return
  237. with fd:
  238. logging.info(safe_path)
  239. chunks = []
  240. size = 0
  241. for chunk in chunkify(fd, CHUNK_SIZE, 30):
  242. chunks.append(self.process_chunk(chunk, cache))
  243. size += len(chunk)
  244. self.items.append({
  245. 'type': 'FILE', 'path': safe_path, 'chunks': chunks, 'size': size,
  246. 'mode': st.st_mode,
  247. 'uid': st.st_uid, 'user': uid2user(st.st_uid),
  248. 'gid': st.st_gid, 'group': gid2group(st.st_gid),
  249. 'ctime': st.st_ctime, 'mtime': st.st_mtime,
  250. })
  251. def process_chunk(self, data, cache):
  252. id = self.crypto.id_hash(data)
  253. try:
  254. return self.chunk_idx[id]
  255. except KeyError:
  256. idx = len(self.chunks)
  257. size = cache.add_chunk(id, data, self.crypto)
  258. self.chunks.append((id, size))
  259. self.chunk_idx[id] = idx
  260. return idx
  261. @staticmethod
  262. def list_archives(store, crypto):
  263. for id in list(store.list(NS_ARCHIVE_METADATA)):
  264. archive = Archive(store, crypto)
  265. archive.load(id)
  266. yield archive