archiver.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311
  1. import os
  2. import hashlib
  3. import logging
  4. import zlib
  5. import argparse
  6. import sys
  7. import stat
  8. from datetime import datetime
  9. import msgpack
  10. from .chunkifier import chunkify
  11. from .cache import Cache, NS_ARCHIVES, NS_CHUNKS
  12. from .bandstore import BandStore
  13. from .helpers import location_validator, pretty_size, LevelFilter
  14. CHUNK_SIZE = 55001
  15. class Archive(object):
  16. def __init__(self, store, cache, name=None):
  17. self.store = store
  18. self.cache = cache
  19. self.items = []
  20. self.chunks = []
  21. self.chunk_idx = {}
  22. if name:
  23. self.open(name)
  24. def open(self, name):
  25. id = self.cache.archives[name]
  26. data = self.store.get(NS_ARCHIVES, id)
  27. if hashlib.sha256(data).digest() != id:
  28. raise Exception('Archive hash did not match')
  29. archive = msgpack.unpackb(zlib.decompress(data))
  30. self.items = archive['items']
  31. self.name = archive['name']
  32. self.chunks = archive['chunks']
  33. for i, chunk in enumerate(archive['chunks']):
  34. self.chunk_idx[i] = chunk[0]
  35. def save(self, name):
  36. archive = {
  37. 'name': name,
  38. 'ts': datetime.utcnow().isoformat(),
  39. 'items': self.items,
  40. 'chunks': self.chunks
  41. }
  42. data = zlib.compress(msgpack.packb(archive))
  43. self.id = hashlib.sha256(data).digest()
  44. self.store.put(NS_ARCHIVES, self.id, data)
  45. self.store.commit()
  46. def add_chunk(self, id, size):
  47. try:
  48. return self.chunk_idx[id]
  49. except KeyError:
  50. idx = len(self.chunks)
  51. self.chunks.append((id, size))
  52. self.chunk_idx[id] = idx
  53. return idx
  54. def stats(self, cache):
  55. total_osize = 0
  56. total_csize = 0
  57. total_usize = 0
  58. chunk_count = {}
  59. for item in self.items:
  60. if item['type'] == 'FILE':
  61. total_osize += item['size']
  62. for idx in item['chunks']:
  63. id = self.chunk_idx[idx]
  64. chunk_count.setdefault(id, 0)
  65. chunk_count[id] += 1
  66. for id, c in chunk_count.items():
  67. count, size = cache.chunkmap[id]
  68. total_csize += size
  69. if c == count:
  70. total_usize += size
  71. return dict(osize=total_osize, csize=total_csize, usize=total_usize)
  72. def list(self):
  73. for item in self.items:
  74. print item['path']
  75. def extract(self, dest=None):
  76. dest = dest or os.getcwdu()
  77. for item in self.items:
  78. assert item['path'][0] not in ('/', '\\', ':')
  79. path = os.path.join(dest, item['path'].decode('utf-8'))
  80. if item['type'] == 'DIRECTORY':
  81. logging.info(path)
  82. if not os.path.exists(path):
  83. os.makedirs(path)
  84. elif item['type'] == 'SYMLINK':
  85. logging.info('%s => %s', path, item['source'])
  86. if not os.path.exists(os.path.dirname(path)):
  87. os.makedirs(os.path.dirname(path))
  88. os.symlink(item['source'], path)
  89. elif item['type'] == 'FILE':
  90. logging.info(path)
  91. if not os.path.exists(os.path.dirname(path)):
  92. os.makedirs(os.path.dirname(path))
  93. with open(path, 'wb') as fd:
  94. for chunk in item['chunks']:
  95. id = self.chunk_idx[chunk]
  96. data = self.store.get(NS_CHUNKS, id)
  97. cid = data[:32]
  98. data = data[32:]
  99. if hashlib.sha256(data).digest() != cid:
  100. raise Exception('Invalid chunk checksum')
  101. data = zlib.decompress(data)
  102. fd.write(data)
  103. def verify(self):
  104. for item in self.items:
  105. if item['type'] == 'FILE':
  106. item['path'] = item['path'].decode('utf-8')
  107. for chunk in item['chunks']:
  108. id = self.chunk_idx[chunk]
  109. data = self.store.get(NS_CHUNKS, id)
  110. data = self.store.get(NS_CHUNKS, id)
  111. cid = data[:32]
  112. data = data[32:]
  113. if (hashlib.sha256(data).digest() != cid):
  114. logging.error('%s ... ERROR', item['path'])
  115. break
  116. else:
  117. logging.info('%s ... OK', item['path'])
  118. def delete(self, cache):
  119. self.store.delete(NS_ARCHIVES, self.cache.archives[self.name])
  120. for item in self.items:
  121. if item['type'] == 'FILE':
  122. for c in item['chunks']:
  123. id = self.chunk_idx[c]
  124. cache.chunk_decref(id)
  125. self.store.commit()
  126. del cache.archives[self.name]
  127. cache.save()
  128. def walk(self, path):
  129. st = os.lstat(path)
  130. if stat.S_ISDIR(st.st_mode):
  131. for f in os.listdir(path):
  132. for x in self.walk(os.path.join(path, f)):
  133. yield x
  134. else:
  135. yield path, st
  136. def create(self, name, paths, cache):
  137. if name in cache.archives:
  138. raise NameError('Archive already exists')
  139. for path in paths:
  140. for path, st in self.walk(unicode(path)):
  141. if stat.S_ISDIR(st.st_mode):
  142. self.process_dir(path, st)
  143. elif stat.S_ISLNK(st.st_mode):
  144. self.process_link(path, st)
  145. elif stat.S_ISREG(st.st_mode):
  146. self.process_file(path, st)
  147. else:
  148. logging.error('Unknown file type: %s', path)
  149. self.save(name)
  150. cache.archives[name] = self.id
  151. cache.save()
  152. def process_dir(self, path, st):
  153. path = path.lstrip('/\\:')
  154. logging.info(path)
  155. self.items.append({'type': 'DIRECTORY', 'path': path})
  156. def process_link(self, path, st):
  157. source = os.readlink(path)
  158. path = path.lstrip('/\\:')
  159. logging.info('%s => %s', path, source)
  160. self.items.append({'type': 'SYMLINK', 'path': path, 'source': source})
  161. def process_file(self, path, st):
  162. try:
  163. fd = open(path, 'rb')
  164. except IOError, e:
  165. logging.error(e)
  166. return
  167. with fd:
  168. path = path.lstrip('/\\:')
  169. logging.info(path)
  170. chunks = []
  171. size = 0
  172. for chunk in chunkify(fd, CHUNK_SIZE, 30):
  173. size += len(chunk)
  174. chunks.append(self.add_chunk(*self.cache.add_chunk(chunk)))
  175. self.items.append({'type': 'FILE', 'path': path, 'chunks': chunks, 'size': size})
  176. class Archiver(object):
  177. def open_store(self, location):
  178. store = BandStore(location.path)
  179. cache = Cache(store)
  180. return store, cache
  181. def exit_code_from_logger(self):
  182. if not self.level_filter.count.get('ERROR'):
  183. return 0
  184. else:
  185. return 1
  186. def do_create(self, args):
  187. store, cache = self.open_store(args.archive)
  188. archive = Archive(store, cache)
  189. archive.create(args.archive.archive, args.paths, cache)
  190. return self.exit_code_from_logger()
  191. def do_extract(self, args):
  192. store, cache = self.open_store(args.archive)
  193. archive = Archive(store, cache, args.archive.archive)
  194. archive.extract(args.dest)
  195. return self.exit_code_from_logger()
  196. def do_delete(self, args):
  197. store, cache = self.open_store(args.archive)
  198. archive = Archive(store, cache, args.archive.archive)
  199. archive.delete(cache)
  200. return self.exit_code_from_logger()
  201. def do_list(self, args):
  202. store, cache = self.open_store(args.src)
  203. if args.src.archive:
  204. archive = Archive(store, cache, args.src.archive)
  205. archive.list()
  206. else:
  207. for archive in sorted(cache.archives):
  208. print archive
  209. return self.exit_code_from_logger()
  210. def do_verify(self, args):
  211. store, cache = self.open_store(args.archive)
  212. archive = Archive(store, cache, args.archive.archive)
  213. archive.verify()
  214. return self.exit_code_from_logger()
  215. def do_info(self, args):
  216. store, cache = self.open_store(args.archive)
  217. archive = Archive(store, cache, args.archive.archive)
  218. stats = archive.stats(cache)
  219. print 'Original size:', pretty_size(stats['osize'])
  220. print 'Compressed size:', pretty_size(stats['csize'])
  221. print 'Unique data:', pretty_size(stats['usize'])
  222. return self.exit_code_from_logger()
  223. def run(self, args=None):
  224. parser = argparse.ArgumentParser(description='Dedupestore')
  225. parser.add_argument('-v', '--verbose', dest='verbose', action='store_true',
  226. default=False,
  227. help='Verbose output')
  228. subparsers = parser.add_subparsers(title='Available subcommands')
  229. subparser = subparsers.add_parser('create')
  230. subparser.set_defaults(func=self.do_create)
  231. subparser.add_argument('archive', metavar='ARCHIVE',
  232. type=location_validator(archive=True),
  233. help='Archive to create')
  234. subparser.add_argument('paths', metavar='PATH', nargs='+', type=str,
  235. help='Paths to add to archive')
  236. subparser = subparsers.add_parser('extract')
  237. subparser.set_defaults(func=self.do_extract)
  238. subparser.add_argument('archive', metavar='ARCHIVE',
  239. type=location_validator(archive=True),
  240. help='Archive to create')
  241. subparser.add_argument('dest', metavar='DEST', type=str, nargs='?',
  242. help='Where to extract files')
  243. subparser = subparsers.add_parser('delete')
  244. subparser.set_defaults(func=self.do_delete)
  245. subparser.add_argument('archive', metavar='ARCHIVE',
  246. type=location_validator(archive=True),
  247. help='Archive to delete')
  248. subparser = subparsers.add_parser('list')
  249. subparser.set_defaults(func=self.do_list)
  250. subparser.add_argument('src', metavar='SRC', type=location_validator(),
  251. help='Store/Archive to list contents of')
  252. subparser= subparsers.add_parser('verify')
  253. subparser.set_defaults(func=self.do_verify)
  254. subparser.add_argument('archive', metavar='ARCHIVE',
  255. type=location_validator(archive=True),
  256. help='Archive to verity integrity of')
  257. subparser= subparsers.add_parser('info')
  258. subparser.set_defaults(func=self.do_info)
  259. subparser.add_argument('archive', metavar='ARCHIVE',
  260. type=location_validator(archive=True),
  261. help='Archive to display information about')
  262. args = parser.parse_args(args)
  263. if args.verbose:
  264. logging.basicConfig(level=logging.INFO, format='%(message)s')
  265. else:
  266. logging.basicConfig(level=logging.WARNING, format='%(message)s')
  267. self.level_filter = LevelFilter()
  268. logging.getLogger('').addFilter(self.level_filter)
  269. return args.func(args)
  270. def main():
  271. archiver = Archiver()
  272. sys.exit(archiver.run())
  273. if __name__ == '__main__':
  274. main()