archiver.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323
  1. import os
  2. import hashlib
  3. import logging
  4. import zlib
  5. import argparse
  6. import sys
  7. import stat
  8. from datetime import datetime
  9. import msgpack
  10. from chunkifier import chunkify
  11. from cache import Cache, NS_ARCHIVES, NS_CHUNKS
  12. from bandstore import BandStore
  13. from helpers import location_validator, pretty_size
  14. CHUNK_SIZE = 55001
  15. class LevelFilter(logging.Filter):
  16. def __init__(self, *args, **kwargs):
  17. logging.Filter.__init__(self, *args, **kwargs)
  18. self.count = {}
  19. def filter(self, record):
  20. self.count.setdefault(record.levelname, 0)
  21. self.count[record.levelname] += 1
  22. return record
  23. class Archive(object):
  24. def __init__(self, store, cache, name=None):
  25. self.store = store
  26. self.cache = cache
  27. self.items = []
  28. self.chunks = []
  29. self.chunk_idx = {}
  30. if name:
  31. self.open(name)
  32. def open(self, name):
  33. id = self.cache.archives[name]
  34. data = self.store.get(NS_ARCHIVES, id)
  35. if hashlib.sha256(data).digest() != id:
  36. raise Exception('Archive hash did not match')
  37. archive = msgpack.unpackb(zlib.decompress(data))
  38. self.items = archive['items']
  39. self.name = archive['name']
  40. self.chunks = archive['chunks']
  41. for i, chunk in enumerate(archive['chunks']):
  42. self.chunk_idx[i] = chunk[0]
  43. def save(self, name):
  44. archive = {
  45. 'name': name,
  46. 'ts': datetime.utcnow().isoformat(),
  47. 'items': self.items,
  48. 'chunks': self.chunks
  49. }
  50. data = zlib.compress(msgpack.packb(archive))
  51. self.id = hashlib.sha256(data).digest()
  52. self.store.put(NS_ARCHIVES, self.id, data)
  53. self.store.commit()
  54. def add_chunk(self, id, size):
  55. try:
  56. return self.chunk_idx[id]
  57. except KeyError:
  58. idx = len(self.chunks)
  59. self.chunks.append((id, size))
  60. self.chunk_idx[id] = idx
  61. return idx
  62. def stats(self, cache):
  63. total_osize = 0
  64. total_csize = 0
  65. total_usize = 0
  66. chunk_count = {}
  67. for item in self.items:
  68. if item['type'] == 'FILE':
  69. total_osize += item['size']
  70. for idx in item['chunks']:
  71. id = self.chunk_idx[idx]
  72. chunk_count.setdefault(id, 0)
  73. chunk_count[id] += 1
  74. for id, c in chunk_count.items():
  75. count, size = cache.chunkmap[id]
  76. total_csize += size
  77. if c == count:
  78. total_usize += size
  79. return dict(osize=total_osize, csize=total_csize, usize=total_usize)
  80. def list(self):
  81. for item in self.items:
  82. print item['path']
  83. def extract(self, dest=None):
  84. dest = dest or os.getcwdu()
  85. for item in self.items:
  86. assert item['path'][0] not in ('/', '\\', ':')
  87. path = os.path.join(dest, item['path'].decode('utf-8'))
  88. if item['type'] == 'DIRECTORY':
  89. logging.info(path)
  90. if not os.path.exists(path):
  91. os.makedirs(path)
  92. elif item['type'] == 'SYMLINK':
  93. logging.info('%s => %s', path, item['source'])
  94. if not os.path.exists(os.path.dirname(path)):
  95. os.makedirs(os.path.dirname(path))
  96. os.symlink(item['source'], path)
  97. elif item['type'] == 'FILE':
  98. logging.info(path)
  99. if not os.path.exists(os.path.dirname(path)):
  100. os.makedirs(os.path.dirname(path))
  101. with open(path, 'wb') as fd:
  102. for chunk in item['chunks']:
  103. id = self.chunk_idx[chunk]
  104. data = self.store.get(NS_CHUNKS, id)
  105. cid = data[:32]
  106. data = data[32:]
  107. if hashlib.sha256(data).digest() != cid:
  108. raise Exception('Invalid chunk checksum')
  109. data = zlib.decompress(data)
  110. fd.write(data)
  111. def verify(self):
  112. for item in self.items:
  113. if item['type'] == 'FILE':
  114. item['path'] = item['path'].decode('utf-8')
  115. for chunk in item['chunks']:
  116. id = self.chunk_idx[chunk]
  117. data = self.store.get(NS_CHUNKS, id)
  118. data = self.store.get(NS_CHUNKS, id)
  119. cid = data[:32]
  120. data = data[32:]
  121. if (hashlib.sha256(data).digest() != cid):
  122. logging.error('%s ... ERROR', item['path'])
  123. break
  124. else:
  125. logging.info('%s ... OK', item['path'])
  126. def delete(self, cache):
  127. self.store.delete(NS_ARCHIVES, self.cache.archives[self.name])
  128. for item in self.items:
  129. if item['type'] == 'FILE':
  130. for c in item['chunks']:
  131. id = self.chunk_idx[c]
  132. cache.chunk_decref(id)
  133. self.store.commit()
  134. del cache.archives[self.name]
  135. cache.save()
  136. def walk(self, path):
  137. st = os.lstat(path)
  138. if stat.S_ISDIR(st.st_mode):
  139. for f in os.listdir(path):
  140. for x in self.walk(os.path.join(path, f)):
  141. yield x
  142. else:
  143. yield path, st
  144. def create(self, name, paths, cache):
  145. if name in cache.archives:
  146. raise NameError('Archive already exists')
  147. for path in paths:
  148. for path, st in self.walk(unicode(path)):
  149. if stat.S_ISDIR(st.st_mode):
  150. self.process_dir(path, st)
  151. elif stat.S_ISLNK(st.st_mode):
  152. self.process_link(path, st)
  153. elif stat.S_ISREG(st.st_mode):
  154. self.process_file(path, st)
  155. else:
  156. logging.error('Unknown file type: %s', path)
  157. self.save(name)
  158. cache.archives[name] = self.id
  159. cache.save()
  160. def process_dir(self, path, st):
  161. path = path.lstrip('/\\:')
  162. logging.info(path)
  163. self.items.append({'type': 'DIRECTORY', 'path': path})
  164. def process_link(self, path, st):
  165. source = os.readlink(path)
  166. path = path.lstrip('/\\:')
  167. logging.info('%s => %s', path, source)
  168. self.items.append({'type': 'SYMLINK', 'path': path, 'source': source})
  169. def process_file(self, path, st):
  170. try:
  171. fd = open(path, 'rb')
  172. except IOError, e:
  173. logging.error(e)
  174. return
  175. with fd:
  176. path = path.lstrip('/\\:')
  177. logging.info(path)
  178. chunks = []
  179. size = 0
  180. for chunk in chunkify(fd, CHUNK_SIZE, 30):
  181. size += len(chunk)
  182. chunks.append(self.add_chunk(*self.cache.add_chunk(chunk)))
  183. self.items.append({'type': 'FILE', 'path': path, 'chunks': chunks, 'size': size})
  184. class Archiver(object):
  185. def open_store(self, location):
  186. store = BandStore(location.path)
  187. cache = Cache(store)
  188. return store, cache
  189. def exit_code_from_logger(self):
  190. if not self.level_filter.count.get('ERROR'):
  191. return 0
  192. else:
  193. return 1
  194. def do_create(self, args):
  195. store, cache = self.open_store(args.archive)
  196. archive = Archive(store, cache)
  197. archive.create(args.archive.archive, args.paths, cache)
  198. return self.exit_code_from_logger()
  199. def do_extract(self, args):
  200. store, cache = self.open_store(args.archive)
  201. archive = Archive(store, cache, args.archive.archive)
  202. archive.extract(args.dest)
  203. return self.exit_code_from_logger()
  204. def do_delete(self, args):
  205. store, cache = self.open_store(args.archive)
  206. archive = Archive(store, cache, args.archive.archive)
  207. archive.delete(cache)
  208. return self.exit_code_from_logger()
  209. def do_list(self, args):
  210. store, cache = self.open_store(args.src)
  211. if args.src.archive:
  212. archive = Archive(store, cache, args.src.archive)
  213. archive.list()
  214. else:
  215. for archive in sorted(cache.archives):
  216. print archive
  217. return self.exit_code_from_logger()
  218. def do_verify(self, args):
  219. store, cache = self.open_store(args.archive)
  220. archive = Archive(store, cache, args.archive.archive)
  221. archive.verify()
  222. return self.exit_code_from_logger()
  223. def do_info(self, args):
  224. store, cache = self.open_store(args.archive)
  225. archive = Archive(store, cache, args.archive.archive)
  226. stats = archive.stats(cache)
  227. print 'Original size:', pretty_size(stats['osize'])
  228. print 'Compressed size:', pretty_size(stats['csize'])
  229. print 'Unique data:', pretty_size(stats['usize'])
  230. return self.exit_code_from_logger()
  231. def run(self, args=None):
  232. parser = argparse.ArgumentParser(description='Dedupestore')
  233. parser.add_argument('-v', '--verbose', dest='verbose', action='store_true',
  234. default=False,
  235. help='Verbose output')
  236. subparsers = parser.add_subparsers(title='Available subcommands')
  237. subparser = subparsers.add_parser('create')
  238. subparser.set_defaults(func=self.do_create)
  239. subparser.add_argument('archive', metavar='ARCHIVE',
  240. type=location_validator(archive=True),
  241. help='Archive to create')
  242. subparser.add_argument('paths', metavar='PATH', nargs='+', type=str,
  243. help='Paths to add to archive')
  244. subparser = subparsers.add_parser('extract')
  245. subparser.set_defaults(func=self.do_extract)
  246. subparser.add_argument('archive', metavar='ARCHIVE',
  247. type=location_validator(archive=True),
  248. help='Archive to create')
  249. subparser.add_argument('dest', metavar='DEST', type=str, nargs='?',
  250. help='Where to extract files')
  251. subparser = subparsers.add_parser('delete')
  252. subparser.set_defaults(func=self.do_delete)
  253. subparser.add_argument('archive', metavar='ARCHIVE',
  254. type=location_validator(archive=True),
  255. help='Archive to delete')
  256. subparser = subparsers.add_parser('list')
  257. subparser.set_defaults(func=self.do_list)
  258. subparser.add_argument('src', metavar='SRC', type=location_validator(),
  259. help='Store/Archive to list contents of')
  260. subparser= subparsers.add_parser('verify')
  261. subparser.set_defaults(func=self.do_verify)
  262. subparser.add_argument('archive', metavar='ARCHIVE',
  263. type=location_validator(archive=True),
  264. help='Archive to verity integrity of')
  265. subparser= subparsers.add_parser('info')
  266. subparser.set_defaults(func=self.do_info)
  267. subparser.add_argument('archive', metavar='ARCHIVE',
  268. type=location_validator(archive=True),
  269. help='Archive to display information about')
  270. args = parser.parse_args(args)
  271. if args.verbose:
  272. logging.basicConfig(level=logging.INFO, format='%(message)s')
  273. else:
  274. logging.basicConfig(level=logging.WARNING, format='%(message)s')
  275. self.level_filter = LevelFilter()
  276. logging.getLogger('').addFilter(self.level_filter)
  277. return args.func(args)
  278. def main():
  279. archiver = Archiver()
  280. sys.exit(archiver.run())
  281. if __name__ == '__main__':
  282. main()