archive.py 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207
  1. from datetime import datetime
  2. import hashlib
  3. import logging
  4. import msgpack
  5. import os
  6. import stat
  7. import zlib
  8. from .cache import NS_ARCHIVES, NS_CHUNKS
  9. from .chunkifier import chunkify
  10. from .helpers import uid2user, user2uid, gid2group, group2gid
  11. CHUNK_SIZE = 55001
  12. class Archive(object):
  13. def __init__(self, store, cache, name=None):
  14. self.store = store
  15. self.cache = cache
  16. self.items = []
  17. self.chunks = []
  18. self.chunk_idx = {}
  19. if name:
  20. self.open(name)
  21. def open(self, name):
  22. id = self.cache.archives[name]
  23. data = self.store.get(NS_ARCHIVES, id)
  24. if hashlib.sha256(data).digest() != id:
  25. raise Exception('Archive hash did not match')
  26. archive = msgpack.unpackb(zlib.decompress(data))
  27. self.items = archive['items']
  28. self.name = archive['name']
  29. self.chunks = archive['chunks']
  30. for i, chunk in enumerate(archive['chunks']):
  31. self.chunk_idx[i] = chunk[0]
  32. def save(self, name):
  33. archive = {
  34. 'name': name,
  35. 'ts': datetime.utcnow().isoformat(),
  36. 'items': self.items,
  37. 'chunks': self.chunks
  38. }
  39. data = zlib.compress(msgpack.packb(archive))
  40. self.id = hashlib.sha256(data).digest()
  41. self.store.put(NS_ARCHIVES, self.id, data)
  42. self.store.commit()
  43. def add_chunk(self, id, size):
  44. try:
  45. return self.chunk_idx[id]
  46. except KeyError:
  47. idx = len(self.chunks)
  48. self.chunks.append((id, size))
  49. self.chunk_idx[id] = idx
  50. return idx
  51. def stats(self, cache):
  52. total_osize = 0
  53. total_csize = 0
  54. total_usize = 0
  55. chunk_count = {}
  56. for item in self.items:
  57. if item['type'] == 'FILE':
  58. total_osize += item['size']
  59. for idx in item['chunks']:
  60. id = self.chunk_idx[idx]
  61. chunk_count.setdefault(id, 0)
  62. chunk_count[id] += 1
  63. for id, c in chunk_count.items():
  64. count, size = cache.chunkmap[id]
  65. total_csize += size
  66. if c == count:
  67. total_usize += size
  68. return dict(osize=total_osize, csize=total_csize, usize=total_usize)
  69. def list(self):
  70. for item in self.items:
  71. print item['path']
  72. def extract(self, dest=None):
  73. dest = dest or os.getcwdu()
  74. for item in self.items:
  75. assert item['path'][0] not in ('/', '\\', ':')
  76. path = os.path.join(dest, item['path'].decode('utf-8'))
  77. if item['type'] == 'DIRECTORY':
  78. logging.info(path)
  79. if not os.path.exists(path):
  80. os.makedirs(path)
  81. elif item['type'] == 'SYMLINK':
  82. logging.info('%s => %s', path, item['source'])
  83. if not os.path.exists(os.path.dirname(path)):
  84. os.makedirs(os.path.dirname(path))
  85. os.symlink(item['source'], path)
  86. elif item['type'] == 'FILE':
  87. logging.info(path)
  88. if not os.path.exists(os.path.dirname(path)):
  89. os.makedirs(os.path.dirname(path))
  90. with open(path, 'wb') as fd:
  91. for chunk in item['chunks']:
  92. id = self.chunk_idx[chunk]
  93. data = self.store.get(NS_CHUNKS, id)
  94. cid = data[:32]
  95. data = data[32:]
  96. if hashlib.sha256(data).digest() != cid:
  97. raise Exception('Invalid chunk checksum')
  98. data = zlib.decompress(data)
  99. fd.write(data)
  100. os.chmod(path, item['mode'])
  101. uid = user2uid(item['user']) or item['uid']
  102. gid = group2gid(item['group']) or item['gid']
  103. try:
  104. os.chown(path, uid, gid)
  105. except OSError:
  106. pass
  107. os.utime(path, (item['ctime'], item['mtime']))
  108. def verify(self):
  109. for item in self.items:
  110. if item['type'] == 'FILE':
  111. item['path'] = item['path'].decode('utf-8')
  112. for chunk in item['chunks']:
  113. id = self.chunk_idx[chunk]
  114. data = self.store.get(NS_CHUNKS, id)
  115. data = self.store.get(NS_CHUNKS, id)
  116. cid = data[:32]
  117. data = data[32:]
  118. if (hashlib.sha256(data).digest() != cid):
  119. logging.error('%s ... ERROR', item['path'])
  120. break
  121. else:
  122. logging.info('%s ... OK', item['path'])
  123. def delete(self, cache):
  124. self.store.delete(NS_ARCHIVES, self.cache.archives[self.name])
  125. for item in self.items:
  126. if item['type'] == 'FILE':
  127. for c in item['chunks']:
  128. id = self.chunk_idx[c]
  129. cache.chunk_decref(id)
  130. self.store.commit()
  131. del cache.archives[self.name]
  132. cache.save()
  133. def walk(self, path):
  134. st = os.lstat(path)
  135. if stat.S_ISDIR(st.st_mode):
  136. for f in os.listdir(path):
  137. for x in self.walk(os.path.join(path, f)):
  138. yield x
  139. else:
  140. yield path, st
  141. def create(self, name, paths, cache):
  142. if name in cache.archives:
  143. raise NameError('Archive already exists')
  144. for path in paths:
  145. for path, st in self.walk(unicode(path)):
  146. if stat.S_ISDIR(st.st_mode):
  147. self.process_dir(path, st)
  148. elif stat.S_ISLNK(st.st_mode):
  149. self.process_link(path, st)
  150. elif stat.S_ISREG(st.st_mode):
  151. self.process_file(path, st)
  152. else:
  153. logging.error('Unknown file type: %s', path)
  154. self.save(name)
  155. cache.archives[name] = self.id
  156. cache.save()
  157. def process_dir(self, path, st):
  158. path = path.lstrip('/\\:')
  159. logging.info(path)
  160. self.items.append({'type': 'DIRECTORY', 'path': path})
  161. def process_link(self, path, st):
  162. source = os.readlink(path)
  163. path = path.lstrip('/\\:')
  164. logging.info('%s => %s', path, source)
  165. self.items.append({'type': 'SYMLINK', 'path': path, 'source': source})
  166. def process_file(self, path, st):
  167. try:
  168. fd = open(path, 'rb')
  169. except IOError, e:
  170. logging.error(e)
  171. return
  172. with fd:
  173. path = path.lstrip('/\\:')
  174. logging.info(path)
  175. chunks = []
  176. size = 0
  177. for chunk in chunkify(fd, CHUNK_SIZE, 30):
  178. size += len(chunk)
  179. chunks.append(self.add_chunk(*self.cache.add_chunk(chunk)))
  180. self.items.append({
  181. 'type': 'FILE', 'path': path, 'chunks': chunks, 'size': size,
  182. 'mode': st.st_mode,
  183. 'uid': st.st_uid, 'user': uid2user(st.st_uid),
  184. 'gid': st.st_gid, 'group': gid2group(st.st_gid),
  185. 'ctime': st.st_ctime, 'mtime': st.st_mtime,
  186. })