archive.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322
  1. from datetime import datetime
  2. from getpass import getuser
  3. import logging
  4. import msgpack
  5. import os
  6. import socket
  7. import stat
  8. import sys
  9. from . import NS_ARCHIVE_METADATA, NS_ARCHIVE_ITEMS, NS_ARCHIVE_CHUNKS, NS_CHUNK
  10. from .chunkifier import chunkify
  11. from .helpers import uid2user, user2uid, gid2group, group2gid, \
  12. IntegrityError, format_file_mode, format_time
  13. CHUNK_SIZE = 55001
  14. have_lchmod = hasattr(os, 'lchmod')
  15. class Archive(object):
  16. def __init__(self, store, crypto, name=None):
  17. self.crypto = crypto
  18. self.store = store
  19. self.items = []
  20. self.chunks = []
  21. self.chunk_idx = {}
  22. self.hard_links = {}
  23. if name:
  24. self.load(self.crypto.id_hash(name))
  25. def load(self, id):
  26. self.id = id
  27. data, self.hash = self.crypto.decrypt(self.store.get(NS_ARCHIVE_METADATA, self.id))
  28. self.metadata = msgpack.unpackb(data)
  29. assert self.metadata['version'] == 1
  30. def get_items(self):
  31. data, chunks_hash = self.crypto.decrypt(self.store.get(NS_ARCHIVE_CHUNKS, self.id))
  32. chunks = msgpack.unpackb(data)
  33. assert chunks['version'] == 1
  34. assert self.metadata['chunks_hash'] == chunks_hash
  35. self.chunks = chunks['chunks']
  36. data, items_hash = self.crypto.decrypt(self.store.get(NS_ARCHIVE_ITEMS, self.id))
  37. items = msgpack.unpackb(data)
  38. assert items['version'] == 1
  39. assert self.metadata['items_hash'] == items_hash
  40. self.items = items['items']
  41. for i, chunk in enumerate(self.chunks):
  42. self.chunk_idx[i] = chunk[0]
  43. def save(self, name):
  44. self.id = self.crypto.id_hash(name)
  45. chunks = {'version': 1, 'chunks': self.chunks}
  46. data, chunks_hash = self.crypto.encrypt_create(msgpack.packb(chunks))
  47. self.store.put(NS_ARCHIVE_CHUNKS, self.id, data)
  48. items = {'version': 1, 'items': self.items}
  49. data, items_hash = self.crypto.encrypt_read(msgpack.packb(items))
  50. self.store.put(NS_ARCHIVE_ITEMS, self.id, data)
  51. metadata = {
  52. 'version': 1,
  53. 'name': name,
  54. 'chunks_hash': chunks_hash,
  55. 'items_hash': items_hash,
  56. 'cmdline': sys.argv,
  57. 'hostname': socket.gethostname(),
  58. 'username': getuser(),
  59. 'time': datetime.utcnow().isoformat(),
  60. }
  61. data, self.hash = self.crypto.encrypt_read(msgpack.packb(metadata))
  62. self.store.put(NS_ARCHIVE_METADATA, self.id, data)
  63. self.store.commit()
  64. def add_chunk(self, id, size):
  65. try:
  66. return self.chunk_idx[id]
  67. except KeyError:
  68. idx = len(self.chunks)
  69. self.chunks.append((id, size))
  70. self.chunk_idx[id] = idx
  71. return idx
  72. def stats(self, cache):
  73. self.get_items()
  74. osize = csize = usize = 0
  75. for item in self.items:
  76. if item['type'] == 'FILE':
  77. osize += item['size']
  78. for id, size in self.chunks:
  79. csize += size
  80. if cache.seen_chunk(id) == 1:
  81. usize += size
  82. return osize, csize, usize
  83. def list(self):
  84. tmap = dict(FILE='-', DIRECTORY='d', SYMLINK='l')
  85. self.get_items()
  86. for item in self.items:
  87. type = tmap[item['type']]
  88. mode = format_file_mode(item['mode'])
  89. size = item.get('size', 0)
  90. mtime = format_time(datetime.fromtimestamp(item['mtime']))
  91. print '%s%s %-6s %-6s %8d %s %s' % (type, mode, item['user'],
  92. item['group'], size, mtime, item['path'])
  93. def extract(self, dest=None):
  94. self.get_items()
  95. dest = dest or os.getcwdu()
  96. dir_stat_queue = []
  97. for item in self.items:
  98. assert item['path'][0] not in ('/', '\\', ':')
  99. path = os.path.join(dest, item['path'].decode('utf-8'))
  100. if item['type'] == 'DIRECTORY':
  101. logging.info(path)
  102. if not os.path.exists(path):
  103. os.makedirs(path)
  104. dir_stat_queue.append((path, item))
  105. continue
  106. elif item['type'] == 'SYMLINK':
  107. if not os.path.exists(os.path.dirname(path)):
  108. os.makedirs(os.path.dirname(path))
  109. source = item['source']
  110. logging.info('%s -> %s', path, source)
  111. if os.path.exists(path):
  112. os.unlink(path)
  113. os.symlink(source, path)
  114. self.restore_stat(path, item, symlink=True)
  115. elif item['type'] == 'HARDLINK':
  116. if not os.path.exists(os.path.dirname(path)):
  117. os.makedirs(os.path.dirname(path))
  118. source = os.path.join(dest, item['source'])
  119. logging.info('%s => %s', path, source)
  120. if os.path.exists(path):
  121. os.unlink(path)
  122. os.link(source, path)
  123. elif item['type'] == 'FILE':
  124. logging.info(path)
  125. if not os.path.exists(os.path.dirname(path)):
  126. os.makedirs(os.path.dirname(path))
  127. with open(path, 'wb') as fd:
  128. for chunk in item['chunks']:
  129. id = self.chunk_idx[chunk]
  130. try:
  131. data, hash = self.crypto.decrypt(self.store.get(NS_CHUNK, id))
  132. if self.crypto.id_hash(data) != id:
  133. raise IntegrityError('chunk id did not match')
  134. fd.write(data)
  135. except ValueError:
  136. raise Exception('Invalid chunk checksum')
  137. self.restore_stat(path, item)
  138. else:
  139. raise Exception('Unknown archive item type %r' % item['type'])
  140. if dir_stat_queue and not path.startswith(dir_stat_queue[-1][0]):
  141. self.restore_stat(*dir_stat_queue.pop())
  142. def restore_stat(self, path, item, symlink=False):
  143. if have_lchmod:
  144. os.lchmod(path, item['mode'])
  145. elif not symlink:
  146. os.chmod(path, item['mode'])
  147. uid = user2uid(item['user']) or item['uid']
  148. gid = group2gid(item['group']) or item['gid']
  149. try:
  150. os.lchown(path, uid, gid)
  151. except OSError:
  152. pass
  153. if not symlink:
  154. # FIXME: We should really call futimes here (c extension required)
  155. os.utime(path, (item['ctime'], item['mtime']))
  156. def verify(self):
  157. self.get_items()
  158. for item in self.items:
  159. if item['type'] == 'FILE':
  160. item['path'] = item['path'].decode('utf-8')
  161. for chunk in item['chunks']:
  162. id = self.chunk_idx[chunk]
  163. try:
  164. data, hash = self.crypto.decrypt(self.store.get(NS_CHUNK, id))
  165. if self.crypto.id_hash(data) != id:
  166. raise IntegrityError('chunk id did not match')
  167. except IntegrityError:
  168. logging.error('%s ... ERROR', item['path'])
  169. break
  170. else:
  171. logging.info('%s ... OK', item['path'])
  172. def delete(self, cache):
  173. self.get_items()
  174. self.store.delete(NS_ARCHIVE_CHUNKS, self.id)
  175. self.store.delete(NS_ARCHIVE_ITEMS, self.id)
  176. self.store.delete(NS_ARCHIVE_METADATA, self.id)
  177. for id, size in self.chunks:
  178. cache.chunk_decref(id)
  179. self.store.commit()
  180. cache.save()
  181. def _walk(self, path):
  182. st = os.lstat(path)
  183. yield path, st
  184. if stat.S_ISDIR(st.st_mode):
  185. for f in os.listdir(path):
  186. for x in self._walk(os.path.join(path, f)):
  187. yield x
  188. def create(self, name, paths, cache):
  189. id = self.crypto.id_hash(name)
  190. try:
  191. self.store.get(NS_ARCHIVE_METADATA, id)
  192. except self.store.DoesNotExist:
  193. pass
  194. else:
  195. raise NameError('Archive already exists')
  196. for path in paths:
  197. for path, st in self._walk(unicode(path)):
  198. if stat.S_ISDIR(st.st_mode):
  199. self.process_dir(path, st)
  200. elif stat.S_ISLNK(st.st_mode):
  201. self.process_symlink(path, st)
  202. elif stat.S_ISREG(st.st_mode):
  203. self.process_file(path, st, cache)
  204. else:
  205. logging.error('Unknown file type: %s', path)
  206. self.save(name)
  207. cache.save()
  208. def process_dir(self, path, st):
  209. path = path.lstrip('/\\:')
  210. logging.info(path)
  211. self.items.append({
  212. 'type': 'DIRECTORY', 'path': path,
  213. 'mode': st.st_mode,
  214. 'uid': st.st_uid, 'user': uid2user(st.st_uid),
  215. 'gid': st.st_gid, 'group': gid2group(st.st_gid),
  216. 'ctime': st.st_ctime, 'mtime': st.st_mtime,
  217. })
  218. def process_symlink(self, path, st):
  219. source = os.readlink(path)
  220. path = path.lstrip('/\\:')
  221. logging.info('%s -> %s', path, source)
  222. self.items.append({
  223. 'type': 'SYMLINK', 'path': path, 'source': source,
  224. 'mode': st.st_mode,
  225. 'uid': st.st_uid, 'user': uid2user(st.st_uid),
  226. 'gid': st.st_gid, 'group': gid2group(st.st_gid),
  227. 'ctime': st.st_ctime, 'mtime': st.st_mtime,
  228. })
  229. def process_file(self, path, st, cache):
  230. safe_path = path.lstrip('/\\:')
  231. # Is it a hard link?
  232. if st.st_nlink > 1:
  233. source = self.hard_links.get((st.st_ino, st.st_dev))
  234. if (st.st_ino, st.st_dev) in self.hard_links:
  235. logging.info('%s => %s', path, source)
  236. self.items.append({ 'type': 'HARDLINK',
  237. 'path': path, 'source': source})
  238. return
  239. else:
  240. self.hard_links[st.st_ino, st.st_dev] = safe_path
  241. logging.info(safe_path)
  242. path_hash = self.crypto.id_hash(path.encode('utf-8'))
  243. ids, size = cache.file_known_and_unchanged(path_hash, st)
  244. if ids is not None:
  245. # Make sure all ids are available
  246. for id in ids:
  247. if not cache.seen_chunk(id):
  248. ids = None
  249. break
  250. else:
  251. chunks = [self.process_chunk2(id, cache) for id in ids]
  252. # Only chunkify the file if needed
  253. if ids is None:
  254. try:
  255. fd = open(path, 'rb')
  256. except IOError, e:
  257. logging.error(e)
  258. return
  259. with fd:
  260. size = 0
  261. ids = []
  262. chunks = []
  263. for chunk in chunkify(fd, CHUNK_SIZE, 30):
  264. id = self.crypto.id_hash(chunk)
  265. ids.append(id)
  266. try:
  267. chunks.append(self.chunk_idx[id])
  268. except KeyError:
  269. chunks.append(self.process_chunk(id, chunk, cache))
  270. size += len(chunk)
  271. cache.memorize_file_chunks(path_hash, st, ids)
  272. self.items.append({
  273. 'type': 'FILE', 'path': safe_path, 'chunks': chunks, 'size': size,
  274. 'mode': st.st_mode,
  275. 'uid': st.st_uid, 'user': uid2user(st.st_uid),
  276. 'gid': st.st_gid, 'group': gid2group(st.st_gid),
  277. 'ctime': st.st_ctime, 'mtime': st.st_mtime,
  278. })
  279. def process_chunk2(self, id, cache):
  280. try:
  281. return self.chunk_idx[id]
  282. except KeyError:
  283. idx = len(self.chunks)
  284. size = cache.chunk_incref(id)
  285. self.chunks.append((id, size))
  286. self.chunk_idx[id] = idx
  287. return idx
  288. def process_chunk(self, id, data, cache):
  289. idx = len(self.chunks)
  290. size = cache.add_chunk(id, data)
  291. self.chunks.append((id, size))
  292. self.chunk_idx[id] = idx
  293. return idx
  294. @staticmethod
  295. def list_archives(store, crypto):
  296. for id in list(store.list(NS_ARCHIVE_METADATA)):
  297. archive = Archive(store, crypto)
  298. archive.load(id)
  299. yield archive