archive.py 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261
  1. from datetime import datetime
  2. import logging
  3. import os
  4. import stat
  5. import sys
  6. from .cache import NS_ARCHIVES, NS_CHUNKS, NS_CINDEX
  7. from .chunkifier import chunkify
  8. from .crypt import CryptoManager
  9. from .helpers import uid2user, user2uid, gid2group, group2gid
  10. CHUNK_SIZE = 55001
  11. class Archive(object):
  12. def __init__(self, store, name=None):
  13. self.crypt = CryptoManager(store)
  14. self.store = store
  15. self.items = []
  16. self.chunks = []
  17. self.chunk_idx = {}
  18. self.hard_links = {}
  19. if name:
  20. self.load(self.crypt.id_hash(name))
  21. def load(self, id):
  22. self.id = id
  23. archive = self.crypt.unpack_read(self.store.get(NS_ARCHIVES, self.id))
  24. if archive['version'] != 1:
  25. raise Exception('Archive version %r not supported' % archive['version'])
  26. self.items = archive['items']
  27. self.name = archive['name']
  28. cindex = self.crypt.unpack_create(self.store.get(NS_CINDEX, self.id))
  29. assert cindex['version'] == 1
  30. self.chunks = cindex['chunks']
  31. for i, chunk in enumerate(self.chunks):
  32. self.chunk_idx[i] = chunk[0]
  33. def save(self, name):
  34. self.id = self.crypt.id_hash(name)
  35. archive = {
  36. 'version': 1,
  37. 'name': name,
  38. 'cmdline': sys.argv,
  39. 'ts': datetime.utcnow().isoformat(),
  40. 'items': self.items,
  41. }
  42. data = self.crypt.pack_read(archive)
  43. self.store.put(NS_ARCHIVES, self.id, data)
  44. cindex = {
  45. 'version': 1,
  46. 'chunks': self.chunks,
  47. }
  48. data = self.crypt.pack_create(cindex)
  49. self.store.put(NS_CINDEX, self.id, data)
  50. self.crypt.store_key()
  51. self.store.commit()
  52. def add_chunk(self, id, size):
  53. try:
  54. return self.chunk_idx[id]
  55. except KeyError:
  56. idx = len(self.chunks)
  57. self.chunks.append((id, size))
  58. self.chunk_idx[id] = idx
  59. return idx
  60. def stats(self, cache):
  61. osize = csize = usize = 0
  62. for item in self.items:
  63. if item['type'] == 'FILE':
  64. osize += item['size']
  65. for id, size in self.chunks:
  66. csize += size
  67. if cache.seen_chunk(id) == 1:
  68. usize += size
  69. return osize, csize, usize
  70. def list(self):
  71. for item in self.items:
  72. print item['path']
  73. def extract(self, dest=None):
  74. dest = dest or os.getcwdu()
  75. dir_stat_queue = []
  76. for item in self.items:
  77. assert item['path'][0] not in ('/', '\\', ':')
  78. path = os.path.join(dest, item['path'].decode('utf-8'))
  79. if item['type'] == 'DIRECTORY':
  80. logging.info(path)
  81. if not os.path.exists(path):
  82. os.makedirs(path)
  83. dir_stat_queue.append((path, item))
  84. continue
  85. elif item['type'] == 'SYMLINK':
  86. if not os.path.exists(os.path.dirname(path)):
  87. os.makedirs(os.path.dirname(path))
  88. source = item['source']
  89. logging.info('%s -> %s', path, source)
  90. if os.path.exists(path):
  91. os.unlink(path)
  92. os.symlink(source, path)
  93. self.restore_stat(path, item, call_utime=False)
  94. elif item['type'] == 'HARDLINK':
  95. if not os.path.exists(os.path.dirname(path)):
  96. os.makedirs(os.path.dirname(path))
  97. source = os.path.join(dest, item['source'])
  98. logging.info('%s => %s', path, source)
  99. if os.path.exists(path):
  100. os.unlink(path)
  101. os.link(source, path)
  102. elif item['type'] == 'FILE':
  103. logging.info(path)
  104. if not os.path.exists(os.path.dirname(path)):
  105. os.makedirs(os.path.dirname(path))
  106. with open(path, 'wb') as fd:
  107. for chunk in item['chunks']:
  108. id = self.chunk_idx[chunk]
  109. try:
  110. fd.write(self.crypt.unpack_read(self.store.get(NS_CHUNKS, id)))
  111. except ValueError:
  112. raise Exception('Invalid chunk checksum')
  113. self.restore_stat(path, item)
  114. else:
  115. raise Exception('Unknown archive item type %r' % item['type'])
  116. if dir_stat_queue and not path.startswith(dir_stat_queue[-1][0]):
  117. self.restore_stat(*dir_stat_queue.pop())
  118. def restore_stat(self, path, item, call_utime=True):
  119. os.lchmod(path, item['mode'])
  120. uid = user2uid(item['user']) or item['uid']
  121. gid = group2gid(item['group']) or item['gid']
  122. try:
  123. os.lchown(path, uid, gid)
  124. except OSError:
  125. pass
  126. if call_utime:
  127. # FIXME: We should really call futimes here (c extension required)
  128. os.utime(path, (item['ctime'], item['mtime']))
  129. def verify(self):
  130. for item in self.items:
  131. if item['type'] == 'FILE':
  132. item['path'] = item['path'].decode('utf-8')
  133. for chunk in item['chunks']:
  134. id = self.chunk_idx[chunk]
  135. try:
  136. self.crypt.unpack_read(self.store.get(NS_CHUNKS, id))
  137. except ValueError:
  138. logging.error('%s ... ERROR', item['path'])
  139. break
  140. else:
  141. logging.info('%s ... OK', item['path'])
  142. def delete(self, cache):
  143. self.store.delete(NS_ARCHIVES, self.id)
  144. self.store.delete(NS_CINDEX, self.id)
  145. for id, size in self.chunks:
  146. cache.chunk_decref(id)
  147. self.store.commit()
  148. cache.save()
  149. def _walk(self, path):
  150. st = os.lstat(path)
  151. yield path, st
  152. if stat.S_ISDIR(st.st_mode):
  153. for f in os.listdir(path):
  154. for x in self._walk(os.path.join(path, f)):
  155. yield x
  156. def create(self, name, paths, cache):
  157. try:
  158. self.store.get(NS_ARCHIVES, name)
  159. except self.store.DoesNotExist:
  160. pass
  161. else:
  162. raise NameError('Archive already exists')
  163. for path in paths:
  164. for path, st in self._walk(unicode(path)):
  165. if stat.S_ISDIR(st.st_mode):
  166. self.process_dir(path, st)
  167. elif stat.S_ISLNK(st.st_mode):
  168. self.process_symlink(path, st)
  169. elif stat.S_ISREG(st.st_mode):
  170. self.process_file(path, st, cache)
  171. else:
  172. logging.error('Unknown file type: %s', path)
  173. self.save(name)
  174. cache.save()
  175. def process_dir(self, path, st):
  176. path = path.lstrip('/\\:')
  177. logging.info(path)
  178. self.items.append({
  179. 'type': 'DIRECTORY', 'path': path,
  180. 'mode': st.st_mode,
  181. 'uid': st.st_uid, 'user': uid2user(st.st_uid),
  182. 'gid': st.st_gid, 'group': gid2group(st.st_gid),
  183. 'ctime': st.st_ctime, 'mtime': st.st_mtime,
  184. })
  185. def process_symlink(self, path, st):
  186. source = os.readlink(path)
  187. path = path.lstrip('/\\:')
  188. logging.info('%s -> %s', path, source)
  189. self.items.append({
  190. 'type': 'SYMLINK', 'path': path, 'source': source,
  191. 'mode': st.st_mode,
  192. 'uid': st.st_uid, 'user': uid2user(st.st_uid),
  193. 'gid': st.st_gid, 'group': gid2group(st.st_gid),
  194. 'ctime': st.st_ctime, 'mtime': st.st_mtime,
  195. })
  196. def process_file(self, path, st, cache):
  197. safe_path = path.lstrip('/\\:')
  198. if st.st_nlink > 1:
  199. source = self.hard_links.get((st.st_ino, st.st_dev))
  200. if (st.st_ino, st.st_dev) in self.hard_links:
  201. logging.info('%s => %s', path, source)
  202. self.items.append({ 'type': 'HARDLINK',
  203. 'path': path, 'source': source})
  204. return
  205. else:
  206. self.hard_links[st.st_ino, st.st_dev] = safe_path
  207. try:
  208. fd = open(path, 'rb')
  209. except IOError, e:
  210. logging.error(e)
  211. return
  212. with fd:
  213. logging.info(safe_path)
  214. chunks = []
  215. size = 0
  216. for chunk in chunkify(fd, CHUNK_SIZE, 30):
  217. chunks.append(self.process_chunk(chunk, cache))
  218. size += len(chunk)
  219. self.items.append({
  220. 'type': 'FILE', 'path': safe_path, 'chunks': chunks, 'size': size,
  221. 'mode': st.st_mode,
  222. 'uid': st.st_uid, 'user': uid2user(st.st_uid),
  223. 'gid': st.st_gid, 'group': gid2group(st.st_gid),
  224. 'ctime': st.st_ctime, 'mtime': st.st_mtime,
  225. })
  226. def process_chunk(self, data, cache):
  227. id = self.crypt.id_hash(data)
  228. try:
  229. return self.chunk_idx[id]
  230. except KeyError:
  231. idx = len(self.chunks)
  232. size = cache.add_chunk(id, data, self.crypt)
  233. self.chunks.append((id, size))
  234. self.chunk_idx[id] = idx
  235. return idx
  236. @staticmethod
  237. def list_archives(store):
  238. for id in store.list(NS_ARCHIVES):
  239. archive = Archive(store)
  240. archive.load(id)
  241. yield archive