archive.py 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212
  1. from datetime import datetime
  2. import hashlib
  3. import logging
  4. import msgpack
  5. import os
  6. import stat
  7. import sys
  8. import zlib
  9. from .cache import NS_ARCHIVES, NS_CHUNKS
  10. from .chunkifier import chunkify
  11. from .helpers import uid2user, user2uid, gid2group, group2gid
  12. CHUNK_SIZE = 55001
  13. class Archive(object):
  14. def __init__(self, store, cache, name=None):
  15. self.store = store
  16. self.cache = cache
  17. self.items = []
  18. self.chunks = []
  19. self.chunk_idx = {}
  20. if name:
  21. self.open(name)
  22. def open(self, name):
  23. id = self.cache.archives[name]
  24. data = self.store.get(NS_ARCHIVES, id)
  25. if hashlib.sha256(data).digest() != id:
  26. raise Exception('Archive hash did not match')
  27. archive = msgpack.unpackb(zlib.decompress(data))
  28. version = archive.get('version')
  29. if version != 1:
  30. raise Exception('Archive version %r not supported' % version)
  31. self.items = archive['items']
  32. self.name = archive['name']
  33. self.chunks = archive['chunks']
  34. for i, chunk in enumerate(archive['chunks']):
  35. self.chunk_idx[i] = chunk[0]
  36. def save(self, name):
  37. archive = {
  38. 'version': 1,
  39. 'name': name,
  40. 'cmdline': ' '.join(sys.argv),
  41. 'ts': datetime.utcnow().isoformat(),
  42. 'items': self.items,
  43. 'chunks': self.chunks
  44. }
  45. data = zlib.compress(msgpack.packb(archive))
  46. self.id = hashlib.sha256(data).digest()
  47. self.store.put(NS_ARCHIVES, self.id, data)
  48. self.store.commit()
  49. def add_chunk(self, id, size):
  50. try:
  51. return self.chunk_idx[id]
  52. except KeyError:
  53. idx = len(self.chunks)
  54. self.chunks.append((id, size))
  55. self.chunk_idx[id] = idx
  56. return idx
  57. def stats(self, cache):
  58. osize = csize = usize = 0
  59. for item in self.items:
  60. if item['type'] == 'FILE':
  61. osize += item['size']
  62. for id, size in self.chunks:
  63. csize += size
  64. if self.cache.seen_chunk(id) == 1:
  65. usize += size
  66. return osize, csize, usize
  67. def list(self):
  68. for item in self.items:
  69. print item['path']
  70. def extract(self, dest=None):
  71. dest = dest or os.getcwdu()
  72. for item in self.items:
  73. assert item['path'][0] not in ('/', '\\', ':')
  74. path = os.path.join(dest, item['path'].decode('utf-8'))
  75. if item['type'] == 'DIRECTORY':
  76. logging.info(path)
  77. if not os.path.exists(path):
  78. os.makedirs(path)
  79. elif item['type'] == 'SYMLINK':
  80. logging.info('%s => %s', path, item['source'])
  81. if not os.path.exists(os.path.dirname(path)):
  82. os.makedirs(os.path.dirname(path))
  83. os.symlink(item['source'], path)
  84. elif item['type'] == 'FILE':
  85. logging.info(path)
  86. if not os.path.exists(os.path.dirname(path)):
  87. os.makedirs(os.path.dirname(path))
  88. with open(path, 'wb') as fd:
  89. for chunk in item['chunks']:
  90. id = self.chunk_idx[chunk]
  91. data = self.store.get(NS_CHUNKS, id)
  92. cid = data[:32]
  93. data = data[32:]
  94. if hashlib.sha256(data).digest() != cid:
  95. raise Exception('Invalid chunk checksum')
  96. data = zlib.decompress(data)
  97. fd.write(data)
  98. os.chmod(path, item['mode'])
  99. uid = user2uid(item['user']) or item['uid']
  100. gid = group2gid(item['group']) or item['gid']
  101. try:
  102. os.chown(path, uid, gid)
  103. except OSError:
  104. pass
  105. os.utime(path, (item['ctime'], item['mtime']))
  106. def verify(self):
  107. for item in self.items:
  108. if item['type'] == 'FILE':
  109. item['path'] = item['path'].decode('utf-8')
  110. for chunk in item['chunks']:
  111. id = self.chunk_idx[chunk]
  112. data = self.store.get(NS_CHUNKS, id)
  113. cid = data[:32]
  114. data = data[32:]
  115. if (hashlib.sha256(data).digest() != cid):
  116. logging.error('%s ... ERROR', item['path'])
  117. break
  118. else:
  119. logging.info('%s ... OK', item['path'])
  120. def delete(self, cache):
  121. self.store.delete(NS_ARCHIVES, self.cache.archives[self.name])
  122. for id, size in self.chunks:
  123. cache.chunk_decref(id)
  124. self.store.commit()
  125. del cache.archives[self.name]
  126. cache.save()
  127. def _walk(self, path):
  128. st = os.lstat(path)
  129. if stat.S_ISDIR(st.st_mode):
  130. for f in os.listdir(path):
  131. for x in self._walk(os.path.join(path, f)):
  132. yield x
  133. else:
  134. yield path, st
  135. def create(self, name, paths, cache):
  136. if name in cache.archives:
  137. raise NameError('Archive already exists')
  138. for path in paths:
  139. for path, st in self._walk(unicode(path)):
  140. if stat.S_ISDIR(st.st_mode):
  141. self.process_dir(path, st)
  142. elif stat.S_ISLNK(st.st_mode):
  143. self.process_link(path, st)
  144. elif stat.S_ISREG(st.st_mode):
  145. self.process_file(path, st)
  146. else:
  147. logging.error('Unknown file type: %s', path)
  148. self.save(name)
  149. cache.archives[name] = self.id
  150. cache.save()
  151. def process_dir(self, path, st):
  152. path = path.lstrip('/\\:')
  153. logging.info(path)
  154. self.items.append({'type': 'DIRECTORY', 'path': path})
  155. def process_link(self, path, st):
  156. source = os.readlink(path)
  157. path = path.lstrip('/\\:')
  158. logging.info('%s => %s', path, source)
  159. self.items.append({'type': 'SYMLINK', 'path': path, 'source': source})
  160. def process_file(self, path, st):
  161. try:
  162. fd = open(path, 'rb')
  163. except IOError, e:
  164. logging.error(e)
  165. return
  166. with fd:
  167. path = path.lstrip('/\\:')
  168. logging.info(path)
  169. chunks = []
  170. size = 0
  171. for chunk in chunkify(fd, CHUNK_SIZE, 30):
  172. chunks.append(self.process_chunk(chunk))
  173. size += len(chunk)
  174. self.items.append({
  175. 'type': 'FILE', 'path': path, 'chunks': chunks, 'size': size,
  176. 'mode': st.st_mode,
  177. 'uid': st.st_uid, 'user': uid2user(st.st_uid),
  178. 'gid': st.st_gid, 'group': gid2group(st.st_gid),
  179. 'ctime': st.st_ctime, 'mtime': st.st_mtime,
  180. })
  181. def process_chunk(self, data):
  182. id = hashlib.sha256(data).digest()
  183. try:
  184. return self.chunk_idx[id]
  185. except KeyError:
  186. idx = len(self.chunks)
  187. size = self.cache.add_chunk(id, data)
  188. self.chunks.append((id, size))
  189. self.chunk_idx[idx] = id
  190. return idx