archive.py 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254
  1. from datetime import datetime
  2. import hashlib
  3. import logging
  4. import msgpack
  5. import os
  6. import stat
  7. import sys
  8. import zlib
  9. from .cache import NS_ARCHIVES, NS_CHUNKS
  10. from .chunkifier import chunkify
  11. from .helpers import uid2user, user2uid, gid2group, group2gid
  12. CHUNK_SIZE = 55001
  13. class Archive(object):
  14. def __init__(self, store, cache, name=None):
  15. self.store = store
  16. self.cache = cache
  17. self.items = []
  18. self.chunks = []
  19. self.chunk_idx = {}
  20. self.hard_links = {}
  21. if name:
  22. self.open(name)
  23. def open(self, name):
  24. id = self.cache.archives[name]
  25. data = self.store.get(NS_ARCHIVES, id)
  26. if hashlib.sha256(data).digest() != id:
  27. raise Exception('Archive hash did not match')
  28. archive = msgpack.unpackb(zlib.decompress(data))
  29. version = archive.get('version')
  30. if version != 1:
  31. raise Exception('Archive version %r not supported' % version)
  32. self.items = archive['items']
  33. self.name = archive['name']
  34. self.chunks = archive['chunks']
  35. for i, chunk in enumerate(archive['chunks']):
  36. self.chunk_idx[i] = chunk[0]
  37. def save(self, name):
  38. archive = {
  39. 'version': 1,
  40. 'name': name,
  41. 'cmdline': ' '.join(sys.argv),
  42. 'ts': datetime.utcnow().isoformat(),
  43. 'items': self.items,
  44. 'chunks': self.chunks
  45. }
  46. data = zlib.compress(msgpack.packb(archive))
  47. self.id = hashlib.sha256(data).digest()
  48. self.store.put(NS_ARCHIVES, self.id, data)
  49. self.store.commit()
  50. def add_chunk(self, id, size):
  51. try:
  52. return self.chunk_idx[id]
  53. except KeyError:
  54. idx = len(self.chunks)
  55. self.chunks.append((id, size))
  56. self.chunk_idx[id] = idx
  57. return idx
  58. def stats(self, cache):
  59. osize = csize = usize = 0
  60. for item in self.items:
  61. if item['type'] == 'FILE':
  62. osize += item['size']
  63. for id, size in self.chunks:
  64. csize += size
  65. if self.cache.seen_chunk(id) == 1:
  66. usize += size
  67. return osize, csize, usize
  68. def list(self):
  69. for item in self.items:
  70. print item['path']
  71. def extract(self, dest=None):
  72. dest = dest or os.getcwdu()
  73. dir_stat_queue = []
  74. for item in self.items:
  75. assert item['path'][0] not in ('/', '\\', ':')
  76. path = os.path.join(dest, item['path'].decode('utf-8'))
  77. if item['type'] == 'DIRECTORY':
  78. logging.info(path)
  79. if not os.path.exists(path):
  80. os.makedirs(path)
  81. dir_stat_queue.append((path, item))
  82. continue
  83. elif item['type'] == 'SYMLINK':
  84. if not os.path.exists(os.path.dirname(path)):
  85. os.makedirs(os.path.dirname(path))
  86. source = item['source']
  87. logging.info('%s -> %s', path, source)
  88. if os.path.exists(path):
  89. os.unlink(path)
  90. os.symlink(source, path)
  91. self.restore_stat(path, item, call_utime=False)
  92. elif item['type'] == 'HARDLINK':
  93. if not os.path.exists(os.path.dirname(path)):
  94. os.makedirs(os.path.dirname(path))
  95. source = os.path.join(dest, item['source'])
  96. logging.info('%s => %s', path, source)
  97. if os.path.exists(path):
  98. os.unlink(path)
  99. os.link(source, path)
  100. elif item['type'] == 'FILE':
  101. logging.info(path)
  102. if not os.path.exists(os.path.dirname(path)):
  103. os.makedirs(os.path.dirname(path))
  104. with open(path, 'wb') as fd:
  105. for chunk in item['chunks']:
  106. id = self.chunk_idx[chunk]
  107. data = self.store.get(NS_CHUNKS, id)
  108. cid = data[:32]
  109. data = data[32:]
  110. if hashlib.sha256(data).digest() != cid:
  111. raise Exception('Invalid chunk checksum')
  112. data = zlib.decompress(data)
  113. fd.write(data)
  114. self.restore_stat(path, item)
  115. else:
  116. raise Exception('Unknown archive item type %r' % item['type'])
  117. if dir_stat_queue and not path.startswith(dir_stat_queue[-1][0]):
  118. self.restore_stat(*dir_stat_queue.pop())
  119. def restore_stat(self, path, item, call_utime=True):
  120. os.lchmod(path, item['mode'])
  121. uid = user2uid(item['user']) or item['uid']
  122. gid = group2gid(item['group']) or item['gid']
  123. try:
  124. os.lchown(path, uid, gid)
  125. except OSError:
  126. pass
  127. if call_utime:
  128. # FIXME: We should really call futimes here (c extension required)
  129. os.utime(path, (item['ctime'], item['mtime']))
  130. def verify(self):
  131. for item in self.items:
  132. if item['type'] == 'FILE':
  133. item['path'] = item['path'].decode('utf-8')
  134. for chunk in item['chunks']:
  135. id = self.chunk_idx[chunk]
  136. data = self.store.get(NS_CHUNKS, id)
  137. cid = data[:32]
  138. data = data[32:]
  139. if (hashlib.sha256(data).digest() != cid):
  140. logging.error('%s ... ERROR', item['path'])
  141. break
  142. else:
  143. logging.info('%s ... OK', item['path'])
  144. def delete(self, cache):
  145. self.store.delete(NS_ARCHIVES, self.cache.archives[self.name])
  146. for id, size in self.chunks:
  147. cache.chunk_decref(id)
  148. self.store.commit()
  149. del cache.archives[self.name]
  150. cache.save()
  151. def _walk(self, path):
  152. st = os.lstat(path)
  153. yield path, st
  154. if stat.S_ISDIR(st.st_mode):
  155. for f in os.listdir(path):
  156. for x in self._walk(os.path.join(path, f)):
  157. yield x
  158. def create(self, name, paths, cache):
  159. if name in cache.archives:
  160. raise NameError('Archive already exists')
  161. for path in paths:
  162. for path, st in self._walk(unicode(path)):
  163. if stat.S_ISDIR(st.st_mode):
  164. self.process_dir(path, st)
  165. elif stat.S_ISLNK(st.st_mode):
  166. self.process_symlink(path, st)
  167. elif stat.S_ISREG(st.st_mode):
  168. self.process_file(path, st)
  169. else:
  170. logging.error('Unknown file type: %s', path)
  171. self.save(name)
  172. cache.archives[name] = self.id
  173. cache.save()
  174. def process_dir(self, path, st):
  175. path = path.lstrip('/\\:')
  176. logging.info(path)
  177. self.items.append({
  178. 'type': 'DIRECTORY', 'path': path,
  179. 'mode': st.st_mode,
  180. 'uid': st.st_uid, 'user': uid2user(st.st_uid),
  181. 'gid': st.st_gid, 'group': gid2group(st.st_gid),
  182. 'ctime': st.st_ctime, 'mtime': st.st_mtime,
  183. })
  184. def process_symlink(self, path, st):
  185. source = os.readlink(path)
  186. path = path.lstrip('/\\:')
  187. logging.info('%s -> %s', path, source)
  188. self.items.append({
  189. 'type': 'SYMLINK', 'path': path, 'source': source,
  190. 'mode': st.st_mode,
  191. 'uid': st.st_uid, 'user': uid2user(st.st_uid),
  192. 'gid': st.st_gid, 'group': gid2group(st.st_gid),
  193. 'ctime': st.st_ctime, 'mtime': st.st_mtime,
  194. })
  195. def process_file(self, path, st):
  196. safe_path = path.lstrip('/\\:')
  197. if st.st_nlink > 1:
  198. source = self.hard_links.get((st.st_ino, st.st_dev))
  199. if (st.st_ino, st.st_dev) in self.hard_links:
  200. logging.info('%s => %s', path, source)
  201. self.items.append({ 'type': 'HARDLINK',
  202. 'path': path, 'source': source})
  203. return
  204. else:
  205. self.hard_links[st.st_ino, st.st_dev] = safe_path
  206. try:
  207. fd = open(path, 'rb')
  208. except IOError, e:
  209. logging.error(e)
  210. return
  211. with fd:
  212. logging.info(safe_path)
  213. chunks = []
  214. size = 0
  215. for chunk in chunkify(fd, CHUNK_SIZE, 30):
  216. chunks.append(self.process_chunk(chunk))
  217. size += len(chunk)
  218. self.items.append({
  219. 'type': 'FILE', 'path': safe_path, 'chunks': chunks, 'size': size,
  220. 'mode': st.st_mode,
  221. 'uid': st.st_uid, 'user': uid2user(st.st_uid),
  222. 'gid': st.st_gid, 'group': gid2group(st.st_gid),
  223. 'ctime': st.st_ctime, 'mtime': st.st_mtime,
  224. })
  225. def process_chunk(self, data):
  226. id = hashlib.sha256(data).digest()
  227. try:
  228. return self.chunk_idx[id]
  229. except KeyError:
  230. idx = len(self.chunks)
  231. size = self.cache.add_chunk(id, data)
  232. self.chunks.append((id, size))
  233. self.chunk_idx[id] = idx
  234. return idx