archive.py 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208
  1. from datetime import datetime
  2. import hashlib
  3. import logging
  4. import msgpack
  5. import os
  6. import stat
  7. import zlib
  8. from .cache import NS_ARCHIVES, NS_CHUNKS
  9. from .chunkifier import chunkify
  10. from .helpers import uid2user, user2uid, gid2group, group2gid
  11. CHUNK_SIZE = 55001
  12. class Archive(object):
  13. def __init__(self, store, cache, name=None):
  14. self.store = store
  15. self.cache = cache
  16. self.items = []
  17. self.chunks = []
  18. self.chunk_idx = {}
  19. if name:
  20. self.open(name)
  21. def open(self, name):
  22. id = self.cache.archives[name]
  23. data = self.store.get(NS_ARCHIVES, id)
  24. if hashlib.sha256(data).digest() != id:
  25. raise Exception('Archive hash did not match')
  26. archive = msgpack.unpackb(zlib.decompress(data))
  27. self.items = archive['items']
  28. self.name = archive['name']
  29. self.chunks = archive['chunks']
  30. for i, chunk in enumerate(archive['chunks']):
  31. self.chunk_idx[i] = chunk[0]
  32. def save(self, name):
  33. archive = {
  34. 'name': name,
  35. 'ts': datetime.utcnow().isoformat(),
  36. 'items': self.items,
  37. 'chunks': self.chunks
  38. }
  39. data = zlib.compress(msgpack.packb(archive))
  40. self.id = hashlib.sha256(data).digest()
  41. self.store.put(NS_ARCHIVES, self.id, data)
  42. self.store.commit()
  43. def add_chunk(self, id, size):
  44. try:
  45. return self.chunk_idx[id]
  46. except KeyError:
  47. idx = len(self.chunks)
  48. self.chunks.append((id, size))
  49. self.chunk_idx[id] = idx
  50. return idx
  51. def stats(self, cache):
  52. total_osize = 0
  53. total_csize = 0
  54. total_usize = 0
  55. for item in self.items:
  56. if item['type'] == 'FILE':
  57. total_osize += item['size']
  58. for id, size in self.chunks:
  59. total_csize += size
  60. if self.cache.seen_chunk(id) == 1:
  61. total_usize += size
  62. return dict(osize=total_osize, csize=total_csize, usize=total_usize)
  63. def list(self):
  64. for item in self.items:
  65. print item['path']
  66. def extract(self, dest=None):
  67. dest = dest or os.getcwdu()
  68. for item in self.items:
  69. assert item['path'][0] not in ('/', '\\', ':')
  70. path = os.path.join(dest, item['path'].decode('utf-8'))
  71. if item['type'] == 'DIRECTORY':
  72. logging.info(path)
  73. if not os.path.exists(path):
  74. os.makedirs(path)
  75. elif item['type'] == 'SYMLINK':
  76. logging.info('%s => %s', path, item['source'])
  77. if not os.path.exists(os.path.dirname(path)):
  78. os.makedirs(os.path.dirname(path))
  79. os.symlink(item['source'], path)
  80. elif item['type'] == 'FILE':
  81. logging.info(path)
  82. if not os.path.exists(os.path.dirname(path)):
  83. os.makedirs(os.path.dirname(path))
  84. with open(path, 'wb') as fd:
  85. for chunk in item['chunks']:
  86. id = self.chunk_idx[chunk]
  87. data = self.store.get(NS_CHUNKS, id)
  88. cid = data[:32]
  89. data = data[32:]
  90. if hashlib.sha256(data).digest() != cid:
  91. raise Exception('Invalid chunk checksum')
  92. data = zlib.decompress(data)
  93. fd.write(data)
  94. os.chmod(path, item['mode'])
  95. uid = user2uid(item['user']) or item['uid']
  96. gid = group2gid(item['group']) or item['gid']
  97. try:
  98. os.chown(path, uid, gid)
  99. except OSError:
  100. pass
  101. os.utime(path, (item['ctime'], item['mtime']))
  102. def verify(self):
  103. for item in self.items:
  104. if item['type'] == 'FILE':
  105. item['path'] = item['path'].decode('utf-8')
  106. for chunk in item['chunks']:
  107. id = self.chunk_idx[chunk]
  108. data = self.store.get(NS_CHUNKS, id)
  109. cid = data[:32]
  110. data = data[32:]
  111. if (hashlib.sha256(data).digest() != cid):
  112. logging.error('%s ... ERROR', item['path'])
  113. break
  114. else:
  115. logging.info('%s ... OK', item['path'])
  116. def delete(self, cache):
  117. self.store.delete(NS_ARCHIVES, self.cache.archives[self.name])
  118. for id, size in self.chunks:
  119. cache.chunk_decref(id)
  120. self.store.commit()
  121. del cache.archives[self.name]
  122. cache.save()
  123. def _walk(self, path):
  124. st = os.lstat(path)
  125. if stat.S_ISDIR(st.st_mode):
  126. for f in os.listdir(path):
  127. for x in self._walk(os.path.join(path, f)):
  128. yield x
  129. else:
  130. yield path, st
  131. def create(self, name, paths, cache):
  132. if name in cache.archives:
  133. raise NameError('Archive already exists')
  134. for path in paths:
  135. for path, st in self._walk(unicode(path)):
  136. if stat.S_ISDIR(st.st_mode):
  137. self.process_dir(path, st)
  138. elif stat.S_ISLNK(st.st_mode):
  139. self.process_link(path, st)
  140. elif stat.S_ISREG(st.st_mode):
  141. self.process_file(path, st)
  142. else:
  143. logging.error('Unknown file type: %s', path)
  144. self.save(name)
  145. cache.archives[name] = self.id
  146. cache.save()
  147. def process_dir(self, path, st):
  148. path = path.lstrip('/\\:')
  149. logging.info(path)
  150. self.items.append({'type': 'DIRECTORY', 'path': path})
  151. def process_link(self, path, st):
  152. source = os.readlink(path)
  153. path = path.lstrip('/\\:')
  154. logging.info('%s => %s', path, source)
  155. self.items.append({'type': 'SYMLINK', 'path': path, 'source': source})
  156. def process_file(self, path, st):
  157. try:
  158. fd = open(path, 'rb')
  159. except IOError, e:
  160. logging.error(e)
  161. return
  162. with fd:
  163. path = path.lstrip('/\\:')
  164. logging.info(path)
  165. chunks = []
  166. size = 0
  167. for chunk in chunkify(fd, CHUNK_SIZE, 30):
  168. chunks.append(self.process_chunk(chunk))
  169. size += len(chunk)
  170. self.items.append({
  171. 'type': 'FILE', 'path': path, 'chunks': chunks, 'size': size,
  172. 'mode': st.st_mode,
  173. 'uid': st.st_uid, 'user': uid2user(st.st_uid),
  174. 'gid': st.st_gid, 'group': gid2group(st.st_gid),
  175. 'ctime': st.st_ctime, 'mtime': st.st_mtime,
  176. })
  177. def process_chunk(self, data):
  178. id = hashlib.sha256(data).digest()
  179. try:
  180. return self.chunk_idx[id]
  181. except KeyError:
  182. idx = len(self.chunks)
  183. size = self.cache.add_chunk(id, data)
  184. self.chunks.append((id, size))
  185. self.chunk_idx[idx] = id
  186. return idx