archive.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386
  1. from __future__ import with_statement
  2. from datetime import datetime, timedelta
  3. from getpass import getuser
  4. import msgpack
  5. import os
  6. import socket
  7. import stat
  8. import sys
  9. import time
  10. from cStringIO import StringIO
  11. from xattr import xattr, XATTR_NOFOLLOW
  12. from ._speedups import chunkify
  13. from .helpers import uid2user, user2uid, gid2group, group2gid, \
  14. Counter, encode_filename, Statistics
  15. ITEMS_BUFFER = 1024 * 1024
  16. CHUNK_SIZE = 64 * 1024
  17. WINDOW_SIZE = 4096
  18. have_lchmod = hasattr(os, 'lchmod')
  19. linux = sys.platform == 'linux2'
  20. class Archive(object):
  21. class DoesNotExist(Exception):
  22. pass
  23. class AlreadyExists(Exception):
  24. pass
  25. def __init__(self, store, key, manifest, name, cache=None, create=False,
  26. checkpoint_interval=300, numeric_owner=False):
  27. self.key = key
  28. self.store = store
  29. self.cache = cache
  30. self.manifest = manifest
  31. self.items = StringIO()
  32. self.items_ids = []
  33. self.hard_links = {}
  34. self.stats = Statistics()
  35. self.name = name
  36. self.checkpoint_interval = checkpoint_interval
  37. self.numeric_owner = numeric_owner
  38. if create:
  39. if name in manifest.archives:
  40. raise self.AlreadyExists
  41. self.last_checkpoint = time.time()
  42. i = 0
  43. while True:
  44. self.checkpoint_name = '%s.checkpoint%s' % (name, i and ('.%d' % i) or '')
  45. if not self.checkpoint_name in manifest.archives:
  46. break
  47. i += 1
  48. else:
  49. try:
  50. info = self.manifest.archives[name]
  51. except KeyError:
  52. raise self.DoesNotExist
  53. self.load(info['id'])
  54. def load(self, id):
  55. self.id = id
  56. data = self.key.decrypt(self.id, self.store.get(self.id))
  57. self.metadata = msgpack.unpackb(data)
  58. if self.metadata['version'] != 1:
  59. raise Exception('Unknown archive metadata version')
  60. self.name = self.metadata['name']
  61. @property
  62. def ts(self):
  63. """Timestamp of archive creation in UTC"""
  64. t, f = self.metadata['time'].split('.', 1)
  65. return datetime.strptime(t, '%Y-%m-%dT%H:%M:%S') + timedelta(seconds=float('.' + f))
  66. def __repr__(self):
  67. return 'Archive(%r)' % self.name
  68. def iter_items(self, callback):
  69. unpacker = msgpack.Unpacker()
  70. counter = Counter(0)
  71. def cb(chunk, error, id):
  72. if error:
  73. raise error
  74. assert not error
  75. counter.dec()
  76. data = self.key.decrypt(id, chunk)
  77. unpacker.feed(data)
  78. for item in unpacker:
  79. callback(item)
  80. for id in self.metadata['items']:
  81. # Limit the number of concurrent items requests to 10
  82. self.store.flush_rpc(counter, 10)
  83. counter.inc()
  84. self.store.get(id, callback=cb, callback_data=id)
  85. def add_item(self, item):
  86. self.items.write(msgpack.packb(item))
  87. now = time.time()
  88. if now - self.last_checkpoint > self.checkpoint_interval:
  89. self.last_checkpoint = now
  90. self.write_checkpoint()
  91. if self.items.tell() > ITEMS_BUFFER:
  92. self.flush_items()
  93. def flush_items(self, flush=False):
  94. if self.items.tell() == 0:
  95. return
  96. self.items.seek(0)
  97. chunks = list(str(s) for s in chunkify(self.items, CHUNK_SIZE, WINDOW_SIZE, self.key.chunk_seed))
  98. self.items.seek(0)
  99. self.items.truncate()
  100. for chunk in chunks[:-1]:
  101. id, _, _ = self.cache.add_chunk(self.key.id_hash(chunk), chunk, self.stats)
  102. self.items_ids.append(id)
  103. if flush or len(chunks) == 1:
  104. id, _, _ = self.cache.add_chunk(self.key.id_hash(chunks[-1]), chunks[-1], self.stats)
  105. self.items_ids.append(id)
  106. else:
  107. self.items.write(chunks[-1])
  108. def write_checkpoint(self):
  109. self.save(self.checkpoint_name)
  110. del self.manifest.archives[self.checkpoint_name]
  111. self.cache.chunk_decref(self.id)
  112. def save(self, name=None):
  113. name = name or self.name
  114. if name in self.manifest.archives:
  115. raise self.AlreadyExists(name)
  116. self.flush_items(flush=True)
  117. metadata = {
  118. 'version': 1,
  119. 'name': name,
  120. 'items': self.items_ids,
  121. 'cmdline': sys.argv,
  122. 'hostname': socket.gethostname(),
  123. 'username': getuser(),
  124. 'time': datetime.utcnow().isoformat(),
  125. }
  126. data = msgpack.packb(metadata)
  127. self.id = self.key.id_hash(data)
  128. self.cache.add_chunk(self.id, data, self.stats)
  129. self.manifest.archives[name] = {'id': self.id, 'time': metadata['time']}
  130. self.manifest.write()
  131. self.store.commit()
  132. self.cache.commit()
  133. def calc_stats(self, cache):
  134. # This function is a bit evil since it abuses the cache to calculate
  135. # the stats. The cache transaction must be rolled back afterwards
  136. def cb(chunk, error, id):
  137. assert not error
  138. data = self.key.decrypt(id, chunk)
  139. unpacker.feed(data)
  140. for item in unpacker:
  141. try:
  142. for id, size, csize in item['chunks']:
  143. count, _, _ = self.cache.chunks[id]
  144. stats.update(size, csize, count == 1)
  145. stats.nfiles += 1
  146. self.cache.chunks[id] = count - 1, size, csize
  147. except KeyError:
  148. pass
  149. unpacker = msgpack.Unpacker()
  150. cache.begin_txn()
  151. stats = Statistics()
  152. for id in self.metadata['items']:
  153. self.store.get(id, callback=cb, callback_data=id)
  154. count, size, csize = self.cache.chunks[id]
  155. stats.update(size, csize, count == 1)
  156. self.cache.chunks[id] = count - 1, size, csize
  157. self.store.flush_rpc()
  158. cache.rollback()
  159. return stats
  160. def extract_item(self, item, dest=None, start_cb=None, restore_attrs=True):
  161. dest = dest or os.getcwdu()
  162. assert item['path'][0] not in ('/', '\\', ':')
  163. path = os.path.join(dest, encode_filename(item['path']))
  164. mode = item['mode']
  165. if stat.S_ISDIR(mode):
  166. if not os.path.exists(path):
  167. os.makedirs(path)
  168. if restore_attrs:
  169. self.restore_attrs(path, item)
  170. elif stat.S_ISFIFO(mode):
  171. if not os.path.exists(os.path.dirname(path)):
  172. os.makedirs(os.path.dirname(path))
  173. os.mkfifo(path)
  174. self.restore_attrs(path, item)
  175. elif stat.S_ISLNK(mode):
  176. if not os.path.exists(os.path.dirname(path)):
  177. os.makedirs(os.path.dirname(path))
  178. source = item['source']
  179. if os.path.exists(path):
  180. os.unlink(path)
  181. os.symlink(source, path)
  182. self.restore_attrs(path, item, symlink=True)
  183. elif stat.S_ISREG(mode):
  184. if not os.path.exists(os.path.dirname(path)):
  185. os.makedirs(os.path.dirname(path))
  186. # Hard link?
  187. if 'source' in item:
  188. def link_cb(_, __, item):
  189. source = os.path.join(dest, item['source'])
  190. if os.path.exists(path):
  191. os.unlink(path)
  192. os.link(source, path)
  193. self.store.add_callback(link_cb, item)
  194. else:
  195. def extract_cb(chunk, error, (id, i)):
  196. if i == 0:
  197. state['fd'] = open(path, 'wb')
  198. start_cb(item)
  199. assert not error
  200. data = self.key.decrypt(id, chunk)
  201. state['fd'].write(data)
  202. if i == n - 1:
  203. state['fd'].close()
  204. self.restore_attrs(path, item)
  205. state = {}
  206. n = len(item['chunks'])
  207. ## 0 chunks indicates an empty (0 bytes) file
  208. if n == 0:
  209. open(path, 'wb').close()
  210. start_cb(item)
  211. self.restore_attrs(path, item)
  212. else:
  213. for i, (id, size, csize) in enumerate(item['chunks']):
  214. self.store.get(id, callback=extract_cb, callback_data=(id, i))
  215. else:
  216. raise Exception('Unknown archive item type %r' % item['mode'])
  217. def restore_attrs(self, path, item, symlink=False):
  218. xattrs = item.get('xattrs')
  219. if xattrs:
  220. xa = xattr(path, XATTR_NOFOLLOW)
  221. for k, v in xattrs.items():
  222. try:
  223. xa.set(k, v)
  224. except (IOError, KeyError):
  225. pass
  226. if have_lchmod:
  227. os.lchmod(path, item['mode'])
  228. elif not symlink:
  229. os.chmod(path, item['mode'])
  230. uid = gid = None
  231. if not self.numeric_owner:
  232. uid = user2uid(item['user'])
  233. gid = group2gid(item['group'])
  234. uid = uid or item['uid']
  235. gid = gid or item['gid']
  236. try:
  237. os.lchown(path, uid, gid)
  238. except OSError:
  239. pass
  240. if not symlink:
  241. # FIXME: We should really call futimes here (c extension required)
  242. os.utime(path, (item['mtime'], item['mtime']))
  243. def verify_file(self, item, start, result):
  244. def verify_chunk(chunk, error, (id, i)):
  245. if error:
  246. if not state:
  247. result(item, False)
  248. state[True] = True
  249. return
  250. if i == 0:
  251. start(item)
  252. self.key.decrypt(id, chunk)
  253. if i == n - 1:
  254. result(item, True)
  255. state = {}
  256. n = len(item['chunks'])
  257. if n == 0:
  258. start(item)
  259. result(item, True)
  260. else:
  261. for i, (id, size, csize) in enumerate(item['chunks']):
  262. self.store.get(id, callback=verify_chunk, callback_data=(id, i))
  263. def delete(self, cache):
  264. def callback(chunk, error, id):
  265. assert not error
  266. data = self.key.decrypt(id, chunk)
  267. unpacker.feed(data)
  268. for item in unpacker:
  269. try:
  270. for chunk_id, size, csize in item['chunks']:
  271. self.cache.chunk_decref(chunk_id)
  272. except KeyError:
  273. pass
  274. self.cache.chunk_decref(id)
  275. unpacker = msgpack.Unpacker()
  276. for id in self.metadata['items']:
  277. self.store.get(id, callback=callback, callback_data=id)
  278. self.store.flush_rpc()
  279. self.cache.chunk_decref(self.id)
  280. del self.manifest.archives[self.name]
  281. self.manifest.write()
  282. self.store.commit()
  283. cache.commit()
  284. def stat_attrs(self, st, path):
  285. item = {
  286. 'mode': st.st_mode,
  287. 'uid': st.st_uid, 'user': uid2user(st.st_uid),
  288. 'gid': st.st_gid, 'group': gid2group(st.st_gid),
  289. 'mtime': st.st_mtime,
  290. }
  291. if self.numeric_owner:
  292. item['user'] = item['group'] = None
  293. try:
  294. xa = xattr(path, XATTR_NOFOLLOW)
  295. xattrs = {}
  296. for key in xa:
  297. # Only store the user namespace on Linux
  298. if linux and not key.startswith('user'):
  299. continue
  300. xattrs[key] = xa[key]
  301. if xattrs:
  302. item['xattrs'] = xattrs
  303. except IOError:
  304. pass
  305. return item
  306. def process_dir(self, path, st):
  307. item = {'path': path.lstrip('/\\:')}
  308. item.update(self.stat_attrs(st, path))
  309. self.add_item(item)
  310. def process_fifo(self, path, st):
  311. item = {'path': path.lstrip('/\\:')}
  312. item.update(self.stat_attrs(st, path))
  313. self.add_item(item)
  314. def process_symlink(self, path, st):
  315. source = os.readlink(path)
  316. item = {'path': path.lstrip('/\\:'), 'source': source}
  317. item.update(self.stat_attrs(st, path))
  318. self.add_item(item)
  319. def process_file(self, path, st, cache):
  320. safe_path = path.lstrip('/\\:')
  321. # Is it a hard link?
  322. if st.st_nlink > 1:
  323. source = self.hard_links.get((st.st_ino, st.st_dev))
  324. if (st.st_ino, st.st_dev) in self.hard_links:
  325. item = self.stat_attrs(st, path)
  326. item.update({'path': safe_path, 'source': source})
  327. self.add_item(item)
  328. return
  329. else:
  330. self.hard_links[st.st_ino, st.st_dev] = safe_path
  331. path_hash = self.key.id_hash(path)
  332. ids = cache.file_known_and_unchanged(path_hash, st)
  333. chunks = None
  334. if ids is not None:
  335. # Make sure all ids are available
  336. for id in ids:
  337. if not cache.seen_chunk(id):
  338. break
  339. else:
  340. chunks = [cache.chunk_incref(id, self.stats) for id in ids]
  341. # Only chunkify the file if needed
  342. if chunks is None:
  343. with open(path, 'rb') as fd:
  344. chunks = []
  345. for chunk in chunkify(fd, CHUNK_SIZE, WINDOW_SIZE,
  346. self.key.chunk_seed):
  347. chunks.append(cache.add_chunk(self.key.id_hash(chunk), chunk, self.stats))
  348. ids = [id for id, _, _ in chunks]
  349. cache.memorize_file(path_hash, st, ids)
  350. item = {'path': safe_path, 'chunks': chunks}
  351. item.update(self.stat_attrs(st, path))
  352. self.stats.nfiles += 1
  353. self.add_item(item)
  354. @staticmethod
  355. def list_archives(store, key, manifest, cache=None):
  356. for name, info in manifest.archives.items():
  357. yield Archive(store, key, manifest, name, cache=cache)