archive.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317
  1. from datetime import datetime
  2. from getpass import getuser
  3. import logging
  4. import msgpack
  5. import os
  6. import socket
  7. import stat
  8. import sys
  9. from . import NS_ARCHIVE_METADATA, NS_ARCHIVE_ITEMS, NS_ARCHIVE_CHUNKS, NS_CHUNK
  10. from .chunkifier import chunkify
  11. from .helpers import uid2user, user2uid, gid2group, group2gid, IntegrityError
  12. CHUNK_SIZE = 55001
  13. class Archive(object):
  14. def __init__(self, store, crypto, name=None):
  15. self.crypto = crypto
  16. self.store = store
  17. self.items = []
  18. self.chunks = []
  19. self.chunk_idx = {}
  20. self.hard_links = {}
  21. if name:
  22. self.load(self.crypto.id_hash(name))
  23. def load(self, id):
  24. self.id = id
  25. data, self.hash = self.crypto.decrypt(self.store.get(NS_ARCHIVE_METADATA, self.id))
  26. self.metadata = msgpack.unpackb(data)
  27. assert self.metadata['version'] == 1
  28. def get_items(self):
  29. data, chunks_hash = self.crypto.decrypt(self.store.get(NS_ARCHIVE_CHUNKS, self.id))
  30. chunks = msgpack.unpackb(data)
  31. assert chunks['version'] == 1
  32. assert self.metadata['chunks_hash'] == chunks_hash
  33. self.chunks = chunks['chunks']
  34. data, items_hash = self.crypto.decrypt(self.store.get(NS_ARCHIVE_ITEMS, self.id))
  35. items = msgpack.unpackb(data)
  36. assert items['version'] == 1
  37. assert self.metadata['items_hash'] == items_hash
  38. self.items = items['items']
  39. for i, chunk in enumerate(self.chunks):
  40. self.chunk_idx[i] = chunk[0]
  41. def save(self, name):
  42. self.id = self.crypto.id_hash(name)
  43. chunks = {'version': 1, 'chunks': self.chunks}
  44. data, chunks_hash = self.crypto.encrypt_create(msgpack.packb(chunks))
  45. self.store.put(NS_ARCHIVE_CHUNKS, self.id, data)
  46. items = {'version': 1, 'items': self.items}
  47. data, items_hash = self.crypto.encrypt_read(msgpack.packb(items))
  48. self.store.put(NS_ARCHIVE_ITEMS, self.id, data)
  49. metadata = {
  50. 'version': 1,
  51. 'name': name,
  52. 'chunks_hash': chunks_hash,
  53. 'items_hash': items_hash,
  54. 'cmdline': sys.argv,
  55. 'hostname': socket.gethostname(),
  56. 'username': getuser(),
  57. 'time': datetime.utcnow().isoformat(),
  58. }
  59. data, self.hash = self.crypto.encrypt_read(msgpack.packb(metadata))
  60. self.store.put(NS_ARCHIVE_METADATA, self.id, data)
  61. self.store.commit()
  62. def add_chunk(self, id, size):
  63. try:
  64. return self.chunk_idx[id]
  65. except KeyError:
  66. idx = len(self.chunks)
  67. self.chunks.append((id, size))
  68. self.chunk_idx[id] = idx
  69. return idx
  70. def stats(self, cache):
  71. self.get_items()
  72. osize = csize = usize = 0
  73. for item in self.items:
  74. if item['type'] == 'FILE':
  75. osize += item['size']
  76. for id, size in self.chunks:
  77. csize += size
  78. if cache.seen_chunk(id) == 1:
  79. usize += size
  80. return osize, csize, usize
  81. def list(self):
  82. self.get_items()
  83. for item in self.items:
  84. mode = str(item['mode'])
  85. size = item.get('size', 0)
  86. mtime = datetime.fromtimestamp(item['mtime'])
  87. print '%s %-6s %-6s %8d %s %s' % (mode, item['user'], item['group'],
  88. size, mtime, item['path'])
  89. def extract(self, dest=None):
  90. self.get_items()
  91. dest = dest or os.getcwdu()
  92. dir_stat_queue = []
  93. for item in self.items:
  94. assert item['path'][0] not in ('/', '\\', ':')
  95. path = os.path.join(dest, item['path'].decode('utf-8'))
  96. if item['type'] == 'DIRECTORY':
  97. logging.info(path)
  98. if not os.path.exists(path):
  99. os.makedirs(path)
  100. dir_stat_queue.append((path, item))
  101. continue
  102. elif item['type'] == 'SYMLINK':
  103. if not os.path.exists(os.path.dirname(path)):
  104. os.makedirs(os.path.dirname(path))
  105. source = item['source']
  106. logging.info('%s -> %s', path, source)
  107. if os.path.exists(path):
  108. os.unlink(path)
  109. os.symlink(source, path)
  110. self.restore_stat(path, item, symlink=True)
  111. elif item['type'] == 'HARDLINK':
  112. if not os.path.exists(os.path.dirname(path)):
  113. os.makedirs(os.path.dirname(path))
  114. source = os.path.join(dest, item['source'])
  115. logging.info('%s => %s', path, source)
  116. if os.path.exists(path):
  117. os.unlink(path)
  118. os.link(source, path)
  119. elif item['type'] == 'FILE':
  120. logging.info(path)
  121. if not os.path.exists(os.path.dirname(path)):
  122. os.makedirs(os.path.dirname(path))
  123. with open(path, 'wb') as fd:
  124. for chunk in item['chunks']:
  125. id = self.chunk_idx[chunk]
  126. try:
  127. data, hash = self.crypto.decrypt(self.store.get(NS_CHUNK, id))
  128. if self.crypto.id_hash(data) != id:
  129. raise IntegrityError('chunk id did not match')
  130. fd.write(data)
  131. except ValueError:
  132. raise Exception('Invalid chunk checksum')
  133. self.restore_stat(path, item)
  134. else:
  135. raise Exception('Unknown archive item type %r' % item['type'])
  136. if dir_stat_queue and not path.startswith(dir_stat_queue[-1][0]):
  137. self.restore_stat(*dir_stat_queue.pop())
  138. def restore_stat(self, path, item, symlink=False):
  139. os.lchmod(path, item['mode'])
  140. uid = user2uid(item['user']) or item['uid']
  141. gid = group2gid(item['group']) or item['gid']
  142. try:
  143. if hasattr(os, 'lchown'): # Not available on Linux
  144. os.lchown(path, uid, gid)
  145. elif not symlink:
  146. os.chown(path, uid, gid)
  147. except OSError:
  148. pass
  149. if not symlink:
  150. # FIXME: We should really call futimes here (c extension required)
  151. os.utime(path, (item['ctime'], item['mtime']))
  152. def verify(self):
  153. self.get_items()
  154. for item in self.items:
  155. if item['type'] == 'FILE':
  156. item['path'] = item['path'].decode('utf-8')
  157. for chunk in item['chunks']:
  158. id = self.chunk_idx[chunk]
  159. try:
  160. data, hash = self.crypto.decrypt(self.store.get(NS_CHUNK, id))
  161. if self.crypto.id_hash(data) != id:
  162. raise IntegrityError('chunk id did not match')
  163. except IntegrityError:
  164. logging.error('%s ... ERROR', item['path'])
  165. break
  166. else:
  167. logging.info('%s ... OK', item['path'])
  168. def delete(self, cache):
  169. self.get_items()
  170. self.store.delete(NS_ARCHIVE_CHUNKS, self.id)
  171. self.store.delete(NS_ARCHIVE_ITEMS, self.id)
  172. self.store.delete(NS_ARCHIVE_METADATA, self.id)
  173. for id, size in self.chunks:
  174. cache.chunk_decref(id)
  175. self.store.commit()
  176. cache.save()
  177. def _walk(self, path):
  178. st = os.lstat(path)
  179. yield path, st
  180. if stat.S_ISDIR(st.st_mode):
  181. for f in os.listdir(path):
  182. for x in self._walk(os.path.join(path, f)):
  183. yield x
  184. def create(self, name, paths, cache):
  185. id = self.crypto.id_hash(name)
  186. try:
  187. self.store.get(NS_ARCHIVE_METADATA, id)
  188. except self.store.DoesNotExist:
  189. pass
  190. else:
  191. raise NameError('Archive already exists')
  192. for path in paths:
  193. for path, st in self._walk(unicode(path)):
  194. if stat.S_ISDIR(st.st_mode):
  195. self.process_dir(path, st)
  196. elif stat.S_ISLNK(st.st_mode):
  197. self.process_symlink(path, st)
  198. elif stat.S_ISREG(st.st_mode):
  199. self.process_file(path, st, cache)
  200. else:
  201. logging.error('Unknown file type: %s', path)
  202. self.save(name)
  203. cache.save()
  204. def process_dir(self, path, st):
  205. path = path.lstrip('/\\:')
  206. logging.info(path)
  207. self.items.append({
  208. 'type': 'DIRECTORY', 'path': path,
  209. 'mode': st.st_mode,
  210. 'uid': st.st_uid, 'user': uid2user(st.st_uid),
  211. 'gid': st.st_gid, 'group': gid2group(st.st_gid),
  212. 'ctime': st.st_ctime, 'mtime': st.st_mtime,
  213. })
  214. def process_symlink(self, path, st):
  215. source = os.readlink(path)
  216. path = path.lstrip('/\\:')
  217. logging.info('%s -> %s', path, source)
  218. self.items.append({
  219. 'type': 'SYMLINK', 'path': path, 'source': source,
  220. 'mode': st.st_mode,
  221. 'uid': st.st_uid, 'user': uid2user(st.st_uid),
  222. 'gid': st.st_gid, 'group': gid2group(st.st_gid),
  223. 'ctime': st.st_ctime, 'mtime': st.st_mtime,
  224. })
  225. def process_file(self, path, st, cache):
  226. safe_path = path.lstrip('/\\:')
  227. # Is it a hard link?
  228. if st.st_nlink > 1:
  229. source = self.hard_links.get((st.st_ino, st.st_dev))
  230. if (st.st_ino, st.st_dev) in self.hard_links:
  231. logging.info('%s => %s', path, source)
  232. self.items.append({ 'type': 'HARDLINK',
  233. 'path': path, 'source': source})
  234. return
  235. else:
  236. self.hard_links[st.st_ino, st.st_dev] = safe_path
  237. logging.info(safe_path)
  238. path_hash = self.crypto.id_hash(path.encode('utf-8'))
  239. ids, size = cache.file_known_and_unchanged(path_hash, st)
  240. if ids is not None:
  241. # Make sure all ids are available
  242. for id in ids:
  243. if not cache.seen_chunk(id):
  244. ids = None
  245. break
  246. else:
  247. chunks = [self.process_chunk2(id, cache) for id in ids]
  248. # Only chunkify the file if needed
  249. if ids is None:
  250. try:
  251. fd = open(path, 'rb')
  252. except IOError, e:
  253. logging.error(e)
  254. return
  255. with fd:
  256. size = 0
  257. ids = []
  258. chunks = []
  259. for chunk in chunkify(fd, CHUNK_SIZE, 30):
  260. id = self.crypto.id_hash(chunk)
  261. ids.append(id)
  262. try:
  263. chunks.append(self.chunk_idx[id])
  264. except KeyError:
  265. chunks.append(self.process_chunk(id, chunk, cache))
  266. size += len(chunk)
  267. cache.memorize_file_chunks(path_hash, st, ids)
  268. self.items.append({
  269. 'type': 'FILE', 'path': safe_path, 'chunks': chunks, 'size': size,
  270. 'mode': st.st_mode,
  271. 'uid': st.st_uid, 'user': uid2user(st.st_uid),
  272. 'gid': st.st_gid, 'group': gid2group(st.st_gid),
  273. 'ctime': st.st_ctime, 'mtime': st.st_mtime,
  274. })
  275. def process_chunk2(self, id, cache):
  276. try:
  277. return self.chunk_idx[id]
  278. except KeyError:
  279. idx = len(self.chunks)
  280. size = cache.chunk_incref(id)
  281. self.chunks.append((id, size))
  282. self.chunk_idx[id] = idx
  283. return idx
  284. def process_chunk(self, id, data, cache):
  285. idx = len(self.chunks)
  286. size = cache.add_chunk(id, data, self.crypto)
  287. self.chunks.append((id, size))
  288. self.chunk_idx[id] = idx
  289. return idx
  290. @staticmethod
  291. def list_archives(store, crypto):
  292. for id in list(store.list(NS_ARCHIVE_METADATA)):
  293. archive = Archive(store, crypto)
  294. archive.load(id)
  295. yield archive