archive.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286
  1. from datetime import datetime
  2. from getpass import getuser
  3. import logging
  4. import msgpack
  5. import os
  6. import socket
  7. import stat
  8. import sys
  9. from . import NS_ARCHIVE_METADATA, NS_ARCHIVE_ITEMS, NS_ARCHIVE_CHUNKS, NS_CHUNK
  10. from .chunkifier import chunkify
  11. from .helpers import uid2user, user2uid, gid2group, group2gid, IntegrityError
  12. CHUNK_SIZE = 55001
  13. class Archive(object):
  14. def __init__(self, store, crypto, name=None):
  15. self.crypto = crypto
  16. self.store = store
  17. self.items = []
  18. self.chunks = []
  19. self.chunk_idx = {}
  20. self.hard_links = {}
  21. if name:
  22. self.load(self.crypto.id_hash(name))
  23. def load(self, id):
  24. self.id = id
  25. data, self.hash = self.crypto.decrypt(self.store.get(NS_ARCHIVE_METADATA, self.id))
  26. self.metadata = msgpack.unpackb(data)
  27. assert self.metadata['version'] == 1
  28. def get_items(self):
  29. data, chunks_hash = self.crypto.decrypt(self.store.get(NS_ARCHIVE_CHUNKS, self.id))
  30. chunks = msgpack.unpackb(data)
  31. assert chunks['version'] == 1
  32. assert self.metadata['chunks_hash'] == chunks_hash
  33. self.chunks = chunks['chunks']
  34. data, items_hash = self.crypto.decrypt(self.store.get(NS_ARCHIVE_ITEMS, self.id))
  35. items = msgpack.unpackb(data)
  36. assert items['version'] == 1
  37. assert self.metadata['items_hash'] == items_hash
  38. self.items = items['items']
  39. for i, chunk in enumerate(self.chunks):
  40. self.chunk_idx[i] = chunk[0]
  41. def save(self, name):
  42. self.id = self.crypto.id_hash(name)
  43. chunks = {'version': 1, 'chunks': self.chunks}
  44. data, chunks_hash = self.crypto.encrypt_create(msgpack.packb(chunks))
  45. self.store.put(NS_ARCHIVE_CHUNKS, self.id, data)
  46. items = {'version': 1, 'items': self.items}
  47. data, items_hash = self.crypto.encrypt_read(msgpack.packb(items))
  48. self.store.put(NS_ARCHIVE_ITEMS, self.id, data)
  49. metadata = {
  50. 'version': 1,
  51. 'name': name,
  52. 'chunks_hash': chunks_hash,
  53. 'items_hash': items_hash,
  54. 'cmdline': sys.argv,
  55. 'hostname': socket.gethostname(),
  56. 'username': getuser(),
  57. 'time': datetime.utcnow().isoformat(),
  58. }
  59. data, self.hash = self.crypto.encrypt_read(msgpack.packb(metadata))
  60. self.store.put(NS_ARCHIVE_METADATA, self.id, data)
  61. self.store.commit()
  62. def add_chunk(self, id, size):
  63. try:
  64. return self.chunk_idx[id]
  65. except KeyError:
  66. idx = len(self.chunks)
  67. self.chunks.append((id, size))
  68. self.chunk_idx[id] = idx
  69. return idx
  70. def stats(self, cache):
  71. self.get_items()
  72. osize = csize = usize = 0
  73. for item in self.items:
  74. if item['type'] == 'FILE':
  75. osize += item['size']
  76. for id, size in self.chunks:
  77. csize += size
  78. if cache.seen_chunk(id) == 1:
  79. usize += size
  80. return osize, csize, usize
  81. def list(self):
  82. for item in self.items:
  83. print item['path']
  84. def extract(self, dest=None):
  85. self.get_items()
  86. dest = dest or os.getcwdu()
  87. dir_stat_queue = []
  88. for item in self.items:
  89. assert item['path'][0] not in ('/', '\\', ':')
  90. path = os.path.join(dest, item['path'].decode('utf-8'))
  91. if item['type'] == 'DIRECTORY':
  92. logging.info(path)
  93. if not os.path.exists(path):
  94. os.makedirs(path)
  95. dir_stat_queue.append((path, item))
  96. continue
  97. elif item['type'] == 'SYMLINK':
  98. if not os.path.exists(os.path.dirname(path)):
  99. os.makedirs(os.path.dirname(path))
  100. source = item['source']
  101. logging.info('%s -> %s', path, source)
  102. if os.path.exists(path):
  103. os.unlink(path)
  104. os.symlink(source, path)
  105. self.restore_stat(path, item, symlink=True)
  106. elif item['type'] == 'HARDLINK':
  107. if not os.path.exists(os.path.dirname(path)):
  108. os.makedirs(os.path.dirname(path))
  109. source = os.path.join(dest, item['source'])
  110. logging.info('%s => %s', path, source)
  111. if os.path.exists(path):
  112. os.unlink(path)
  113. os.link(source, path)
  114. elif item['type'] == 'FILE':
  115. logging.info(path)
  116. if not os.path.exists(os.path.dirname(path)):
  117. os.makedirs(os.path.dirname(path))
  118. with open(path, 'wb') as fd:
  119. for chunk in item['chunks']:
  120. id = self.chunk_idx[chunk]
  121. try:
  122. data, hash = self.crypto.decrypt(self.store.get(NS_CHUNK, id))
  123. if self.crypto.id_hash(data) != id:
  124. raise IntegrityError('chunk id did not match')
  125. fd.write(data)
  126. except ValueError:
  127. raise Exception('Invalid chunk checksum')
  128. self.restore_stat(path, item)
  129. else:
  130. raise Exception('Unknown archive item type %r' % item['type'])
  131. if dir_stat_queue and not path.startswith(dir_stat_queue[-1][0]):
  132. self.restore_stat(*dir_stat_queue.pop())
  133. def restore_stat(self, path, item, symlink=False):
  134. os.lchmod(path, item['mode'])
  135. uid = user2uid(item['user']) or item['uid']
  136. gid = group2gid(item['group']) or item['gid']
  137. try:
  138. if hasattr(os, 'lchown'): # Not available on Linux
  139. os.lchown(path, uid, gid)
  140. elif not symlink:
  141. os.chown(path, uid, gid)
  142. except OSError:
  143. pass
  144. if not symlink:
  145. # FIXME: We should really call futimes here (c extension required)
  146. os.utime(path, (item['ctime'], item['mtime']))
  147. def verify(self):
  148. self.get_items()
  149. for item in self.items:
  150. if item['type'] == 'FILE':
  151. item['path'] = item['path'].decode('utf-8')
  152. for chunk in item['chunks']:
  153. id = self.chunk_idx[chunk]
  154. try:
  155. data, hash = self.crypto.decrypt(self.store.get(NS_CHUNK, id))
  156. if self.crypto.id_hash(data) != id:
  157. raise IntegrityError('chunk id did not match')
  158. except IntegrityError:
  159. logging.error('%s ... ERROR', item['path'])
  160. break
  161. else:
  162. logging.info('%s ... OK', item['path'])
  163. def delete(self, cache):
  164. self.get_items()
  165. self.store.delete(NS_ARCHIVE_CHUNKS, self.id)
  166. self.store.delete(NS_ARCHIVE_ITEMS, self.id)
  167. self.store.delete(NS_ARCHIVE_METADATA, self.id)
  168. for id, size in self.chunks:
  169. cache.chunk_decref(id)
  170. self.store.commit()
  171. cache.save()
  172. def _walk(self, path):
  173. st = os.lstat(path)
  174. yield path, st
  175. if stat.S_ISDIR(st.st_mode):
  176. for f in os.listdir(path):
  177. for x in self._walk(os.path.join(path, f)):
  178. yield x
  179. def create(self, name, paths, cache):
  180. id = self.crypto.id_hash(name)
  181. try:
  182. self.store.get(NS_ARCHIVE_METADATA, id)
  183. except self.store.DoesNotExist:
  184. pass
  185. else:
  186. raise NameError('Archive already exists')
  187. for path in paths:
  188. for path, st in self._walk(unicode(path)):
  189. if stat.S_ISDIR(st.st_mode):
  190. self.process_dir(path, st)
  191. elif stat.S_ISLNK(st.st_mode):
  192. self.process_symlink(path, st)
  193. elif stat.S_ISREG(st.st_mode):
  194. self.process_file(path, st, cache)
  195. else:
  196. logging.error('Unknown file type: %s', path)
  197. self.save(name)
  198. cache.save()
  199. def process_dir(self, path, st):
  200. path = path.lstrip('/\\:')
  201. logging.info(path)
  202. self.items.append({
  203. 'type': 'DIRECTORY', 'path': path,
  204. 'mode': st.st_mode,
  205. 'uid': st.st_uid, 'user': uid2user(st.st_uid),
  206. 'gid': st.st_gid, 'group': gid2group(st.st_gid),
  207. 'ctime': st.st_ctime, 'mtime': st.st_mtime,
  208. })
  209. def process_symlink(self, path, st):
  210. source = os.readlink(path)
  211. path = path.lstrip('/\\:')
  212. logging.info('%s -> %s', path, source)
  213. self.items.append({
  214. 'type': 'SYMLINK', 'path': path, 'source': source,
  215. 'mode': st.st_mode,
  216. 'uid': st.st_uid, 'user': uid2user(st.st_uid),
  217. 'gid': st.st_gid, 'group': gid2group(st.st_gid),
  218. 'ctime': st.st_ctime, 'mtime': st.st_mtime,
  219. })
  220. def process_file(self, path, st, cache):
  221. safe_path = path.lstrip('/\\:')
  222. if st.st_nlink > 1:
  223. source = self.hard_links.get((st.st_ino, st.st_dev))
  224. if (st.st_ino, st.st_dev) in self.hard_links:
  225. logging.info('%s => %s', path, source)
  226. self.items.append({ 'type': 'HARDLINK',
  227. 'path': path, 'source': source})
  228. return
  229. else:
  230. self.hard_links[st.st_ino, st.st_dev] = safe_path
  231. try:
  232. fd = open(path, 'rb')
  233. except IOError, e:
  234. logging.error(e)
  235. return
  236. with fd:
  237. logging.info(safe_path)
  238. chunks = []
  239. size = 0
  240. for chunk in chunkify(fd, CHUNK_SIZE, 30):
  241. chunks.append(self.process_chunk(chunk, cache))
  242. size += len(chunk)
  243. self.items.append({
  244. 'type': 'FILE', 'path': safe_path, 'chunks': chunks, 'size': size,
  245. 'mode': st.st_mode,
  246. 'uid': st.st_uid, 'user': uid2user(st.st_uid),
  247. 'gid': st.st_gid, 'group': gid2group(st.st_gid),
  248. 'ctime': st.st_ctime, 'mtime': st.st_mtime,
  249. })
  250. def process_chunk(self, data, cache):
  251. id = self.crypto.id_hash(data)
  252. try:
  253. return self.chunk_idx[id]
  254. except KeyError:
  255. idx = len(self.chunks)
  256. size = cache.add_chunk(id, data, self.crypto)
  257. self.chunks.append((id, size))
  258. self.chunk_idx[id] = idx
  259. return idx
  260. @staticmethod
  261. def list_archives(store, crypto):
  262. for id in list(store.list(NS_ARCHIVE_METADATA)):
  263. archive = Archive(store, crypto)
  264. archive.load(id)
  265. yield archive