archiver.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326
  1. import os
  2. import hashlib
  3. import logging
  4. import zlib
  5. import argparse
  6. import sys
  7. import stat
  8. from datetime import datetime
  9. import msgpack
  10. from .chunkifier import chunkify
  11. from .cache import Cache, NS_ARCHIVES, NS_CHUNKS
  12. from .bandstore import BandStore
  13. from .helpers import location_validator, pretty_size, LevelFilter, \
  14. uid2user, user2uid, gid2group, group2gid
  15. CHUNK_SIZE = 55001
  16. class Archive(object):
  17. def __init__(self, store, cache, name=None):
  18. self.store = store
  19. self.cache = cache
  20. self.items = []
  21. self.chunks = []
  22. self.chunk_idx = {}
  23. if name:
  24. self.open(name)
  25. def open(self, name):
  26. id = self.cache.archives[name]
  27. data = self.store.get(NS_ARCHIVES, id)
  28. if hashlib.sha256(data).digest() != id:
  29. raise Exception('Archive hash did not match')
  30. archive = msgpack.unpackb(zlib.decompress(data))
  31. self.items = archive['items']
  32. self.name = archive['name']
  33. self.chunks = archive['chunks']
  34. for i, chunk in enumerate(archive['chunks']):
  35. self.chunk_idx[i] = chunk[0]
  36. def save(self, name):
  37. archive = {
  38. 'name': name,
  39. 'ts': datetime.utcnow().isoformat(),
  40. 'items': self.items,
  41. 'chunks': self.chunks
  42. }
  43. data = zlib.compress(msgpack.packb(archive))
  44. self.id = hashlib.sha256(data).digest()
  45. self.store.put(NS_ARCHIVES, self.id, data)
  46. self.store.commit()
  47. def add_chunk(self, id, size):
  48. try:
  49. return self.chunk_idx[id]
  50. except KeyError:
  51. idx = len(self.chunks)
  52. self.chunks.append((id, size))
  53. self.chunk_idx[id] = idx
  54. return idx
  55. def stats(self, cache):
  56. total_osize = 0
  57. total_csize = 0
  58. total_usize = 0
  59. chunk_count = {}
  60. for item in self.items:
  61. if item['type'] == 'FILE':
  62. total_osize += item['size']
  63. for idx in item['chunks']:
  64. id = self.chunk_idx[idx]
  65. chunk_count.setdefault(id, 0)
  66. chunk_count[id] += 1
  67. for id, c in chunk_count.items():
  68. count, size = cache.chunkmap[id]
  69. total_csize += size
  70. if c == count:
  71. total_usize += size
  72. return dict(osize=total_osize, csize=total_csize, usize=total_usize)
  73. def list(self):
  74. for item in self.items:
  75. print item['path']
  76. def extract(self, dest=None):
  77. dest = dest or os.getcwdu()
  78. for item in self.items:
  79. assert item['path'][0] not in ('/', '\\', ':')
  80. path = os.path.join(dest, item['path'].decode('utf-8'))
  81. if item['type'] == 'DIRECTORY':
  82. logging.info(path)
  83. if not os.path.exists(path):
  84. os.makedirs(path)
  85. elif item['type'] == 'SYMLINK':
  86. logging.info('%s => %s', path, item['source'])
  87. if not os.path.exists(os.path.dirname(path)):
  88. os.makedirs(os.path.dirname(path))
  89. os.symlink(item['source'], path)
  90. elif item['type'] == 'FILE':
  91. logging.info(path)
  92. if not os.path.exists(os.path.dirname(path)):
  93. os.makedirs(os.path.dirname(path))
  94. with open(path, 'wb') as fd:
  95. for chunk in item['chunks']:
  96. id = self.chunk_idx[chunk]
  97. data = self.store.get(NS_CHUNKS, id)
  98. cid = data[:32]
  99. data = data[32:]
  100. if hashlib.sha256(data).digest() != cid:
  101. raise Exception('Invalid chunk checksum')
  102. data = zlib.decompress(data)
  103. fd.write(data)
  104. os.chmod(path, item['mode'])
  105. uid = user2uid(item['user']) or item['uid']
  106. gid = group2gid(item['group']) or item['gid']
  107. try:
  108. os.chown(path, uid, gid)
  109. except OSError:
  110. pass
  111. os.utime(path, (item['ctime'], item['mtime']))
  112. def verify(self):
  113. for item in self.items:
  114. if item['type'] == 'FILE':
  115. item['path'] = item['path'].decode('utf-8')
  116. for chunk in item['chunks']:
  117. id = self.chunk_idx[chunk]
  118. data = self.store.get(NS_CHUNKS, id)
  119. data = self.store.get(NS_CHUNKS, id)
  120. cid = data[:32]
  121. data = data[32:]
  122. if (hashlib.sha256(data).digest() != cid):
  123. logging.error('%s ... ERROR', item['path'])
  124. break
  125. else:
  126. logging.info('%s ... OK', item['path'])
  127. def delete(self, cache):
  128. self.store.delete(NS_ARCHIVES, self.cache.archives[self.name])
  129. for item in self.items:
  130. if item['type'] == 'FILE':
  131. for c in item['chunks']:
  132. id = self.chunk_idx[c]
  133. cache.chunk_decref(id)
  134. self.store.commit()
  135. del cache.archives[self.name]
  136. cache.save()
  137. def walk(self, path):
  138. st = os.lstat(path)
  139. if stat.S_ISDIR(st.st_mode):
  140. for f in os.listdir(path):
  141. for x in self.walk(os.path.join(path, f)):
  142. yield x
  143. else:
  144. yield path, st
  145. def create(self, name, paths, cache):
  146. if name in cache.archives:
  147. raise NameError('Archive already exists')
  148. for path in paths:
  149. for path, st in self.walk(unicode(path)):
  150. if stat.S_ISDIR(st.st_mode):
  151. self.process_dir(path, st)
  152. elif stat.S_ISLNK(st.st_mode):
  153. self.process_link(path, st)
  154. elif stat.S_ISREG(st.st_mode):
  155. self.process_file(path, st)
  156. else:
  157. logging.error('Unknown file type: %s', path)
  158. self.save(name)
  159. cache.archives[name] = self.id
  160. cache.save()
  161. def process_dir(self, path, st):
  162. path = path.lstrip('/\\:')
  163. logging.info(path)
  164. self.items.append({'type': 'DIRECTORY', 'path': path})
  165. def process_link(self, path, st):
  166. source = os.readlink(path)
  167. path = path.lstrip('/\\:')
  168. logging.info('%s => %s', path, source)
  169. self.items.append({'type': 'SYMLINK', 'path': path, 'source': source})
  170. def process_file(self, path, st):
  171. try:
  172. fd = open(path, 'rb')
  173. except IOError, e:
  174. logging.error(e)
  175. return
  176. with fd:
  177. path = path.lstrip('/\\:')
  178. logging.info(path)
  179. chunks = []
  180. size = 0
  181. for chunk in chunkify(fd, CHUNK_SIZE, 30):
  182. size += len(chunk)
  183. chunks.append(self.add_chunk(*self.cache.add_chunk(chunk)))
  184. self.items.append({
  185. 'type': 'FILE', 'path': path, 'chunks': chunks, 'size': size,
  186. 'mode': st.st_mode,
  187. 'uid': st.st_uid, 'user': uid2user(st.st_uid),
  188. 'gid': st.st_gid, 'group': gid2group(st.st_gid),
  189. 'ctime': st.st_ctime, 'mtime': st.st_mtime,
  190. })
  191. class Archiver(object):
  192. def open_store(self, location):
  193. store = BandStore(location.path)
  194. cache = Cache(store)
  195. return store, cache
  196. def exit_code_from_logger(self):
  197. if not self.level_filter.count.get('ERROR'):
  198. return 0
  199. else:
  200. return 1
  201. def do_create(self, args):
  202. store, cache = self.open_store(args.archive)
  203. archive = Archive(store, cache)
  204. archive.create(args.archive.archive, args.paths, cache)
  205. return self.exit_code_from_logger()
  206. def do_extract(self, args):
  207. store, cache = self.open_store(args.archive)
  208. archive = Archive(store, cache, args.archive.archive)
  209. archive.extract(args.dest)
  210. return self.exit_code_from_logger()
  211. def do_delete(self, args):
  212. store, cache = self.open_store(args.archive)
  213. archive = Archive(store, cache, args.archive.archive)
  214. archive.delete(cache)
  215. return self.exit_code_from_logger()
  216. def do_list(self, args):
  217. store, cache = self.open_store(args.src)
  218. if args.src.archive:
  219. archive = Archive(store, cache, args.src.archive)
  220. archive.list()
  221. else:
  222. for archive in sorted(cache.archives):
  223. print archive
  224. return self.exit_code_from_logger()
  225. def do_verify(self, args):
  226. store, cache = self.open_store(args.archive)
  227. archive = Archive(store, cache, args.archive.archive)
  228. archive.verify()
  229. return self.exit_code_from_logger()
  230. def do_info(self, args):
  231. store, cache = self.open_store(args.archive)
  232. archive = Archive(store, cache, args.archive.archive)
  233. stats = archive.stats(cache)
  234. print 'Original size:', pretty_size(stats['osize'])
  235. print 'Compressed size:', pretty_size(stats['csize'])
  236. print 'Unique data:', pretty_size(stats['usize'])
  237. return self.exit_code_from_logger()
  238. def run(self, args=None):
  239. parser = argparse.ArgumentParser(description='Dedupestore')
  240. parser.add_argument('-v', '--verbose', dest='verbose', action='store_true',
  241. default=False,
  242. help='Verbose output')
  243. subparsers = parser.add_subparsers(title='Available subcommands')
  244. subparser = subparsers.add_parser('create')
  245. subparser.set_defaults(func=self.do_create)
  246. subparser.add_argument('archive', metavar='ARCHIVE',
  247. type=location_validator(archive=True),
  248. help='Archive to create')
  249. subparser.add_argument('paths', metavar='PATH', nargs='+', type=str,
  250. help='Paths to add to archive')
  251. subparser = subparsers.add_parser('extract')
  252. subparser.set_defaults(func=self.do_extract)
  253. subparser.add_argument('archive', metavar='ARCHIVE',
  254. type=location_validator(archive=True),
  255. help='Archive to create')
  256. subparser.add_argument('dest', metavar='DEST', type=str, nargs='?',
  257. help='Where to extract files')
  258. subparser = subparsers.add_parser('delete')
  259. subparser.set_defaults(func=self.do_delete)
  260. subparser.add_argument('archive', metavar='ARCHIVE',
  261. type=location_validator(archive=True),
  262. help='Archive to delete')
  263. subparser = subparsers.add_parser('list')
  264. subparser.set_defaults(func=self.do_list)
  265. subparser.add_argument('src', metavar='SRC', type=location_validator(),
  266. help='Store/Archive to list contents of')
  267. subparser= subparsers.add_parser('verify')
  268. subparser.set_defaults(func=self.do_verify)
  269. subparser.add_argument('archive', metavar='ARCHIVE',
  270. type=location_validator(archive=True),
  271. help='Archive to verity integrity of')
  272. subparser= subparsers.add_parser('info')
  273. subparser.set_defaults(func=self.do_info)
  274. subparser.add_argument('archive', metavar='ARCHIVE',
  275. type=location_validator(archive=True),
  276. help='Archive to display information about')
  277. args = parser.parse_args(args)
  278. if args.verbose:
  279. logging.basicConfig(level=logging.INFO, format='%(message)s')
  280. else:
  281. logging.basicConfig(level=logging.WARNING, format='%(message)s')
  282. self.level_filter = LevelFilter()
  283. logging.getLogger('').addFilter(self.level_filter)
  284. return args.func(args)
  285. def main():
  286. archiver = Archiver()
  287. sys.exit(archiver.run())
  288. if __name__ == '__main__':
  289. main()