archive.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402
  1. from __future__ import with_statement
  2. from datetime import datetime, timedelta
  3. from getpass import getuser
  4. from itertools import izip
  5. import msgpack
  6. import os
  7. import socket
  8. import stat
  9. import sys
  10. import time
  11. from cStringIO import StringIO
  12. from xattr import xattr, XATTR_NOFOLLOW
  13. from ._speedups import chunkify
  14. from .helpers import uid2user, user2uid, gid2group, group2gid, \
  15. encode_filename, Statistics
  16. ITEMS_BUFFER = 1024 * 1024
  17. CHUNK_SIZE = 64 * 1024
  18. WINDOW_SIZE = 4096
  19. have_lchmod = hasattr(os, 'lchmod')
  20. linux = sys.platform == 'linux2'
  21. class ItemIter(object):
  22. def __init__(self, unpacker, filter):
  23. self.unpacker = iter(unpacker)
  24. self.filter = filter
  25. self.stack = []
  26. self._peek = None
  27. self._peek_iter = None
  28. global foo
  29. foo = self
  30. def __iter__(self):
  31. return self
  32. def next(self):
  33. if self.stack:
  34. return self.stack.pop(0)
  35. self._peek = None
  36. return self.get_next()
  37. def get_next(self):
  38. next = self.unpacker.next()
  39. while self.filter and not self.filter(next):
  40. next = self.unpacker.next()
  41. return next
  42. def peek(self):
  43. while True:
  44. while not self._peek or not self._peek_iter:
  45. if len(self.stack) > 100:
  46. raise StopIteration
  47. self._peek = self.get_next()
  48. self.stack.append(self._peek)
  49. if 'chunks' in self._peek:
  50. self._peek_iter = iter(self._peek['chunks'])
  51. else:
  52. self._peek_iter = None
  53. try:
  54. return self._peek_iter.next()
  55. except StopIteration:
  56. self._peek = None
  57. class Archive(object):
  58. class DoesNotExist(Exception):
  59. pass
  60. class AlreadyExists(Exception):
  61. pass
  62. def __init__(self, store, key, manifest, name, cache=None, create=False,
  63. checkpoint_interval=300, numeric_owner=False):
  64. if sys.platform == 'darwin':
  65. self.cwd = os.getcwdu()
  66. else:
  67. self.cwd = os.getcwd()
  68. self.key = key
  69. self.store = store
  70. self.cache = cache
  71. self.manifest = manifest
  72. self.items = StringIO()
  73. self.items_ids = []
  74. self.hard_links = {}
  75. self.stats = Statistics()
  76. self.name = name
  77. self.checkpoint_interval = checkpoint_interval
  78. self.numeric_owner = numeric_owner
  79. if create:
  80. if name in manifest.archives:
  81. raise self.AlreadyExists
  82. self.last_checkpoint = time.time()
  83. i = 0
  84. while True:
  85. self.checkpoint_name = '%s.checkpoint%s' % (name, i and ('.%d' % i) or '')
  86. if not self.checkpoint_name in manifest.archives:
  87. break
  88. i += 1
  89. else:
  90. try:
  91. info = self.manifest.archives[name]
  92. except KeyError:
  93. raise self.DoesNotExist
  94. self.load(info['id'])
  95. def load(self, id):
  96. self.id = id
  97. data = self.key.decrypt(self.id, self.store.get(self.id))
  98. self.metadata = msgpack.unpackb(data)
  99. if self.metadata['version'] != 1:
  100. raise Exception('Unknown archive metadata version')
  101. self.name = self.metadata['name']
  102. @property
  103. def ts(self):
  104. """Timestamp of archive creation in UTC"""
  105. t, f = self.metadata['time'].split('.', 1)
  106. return datetime.strptime(t, '%Y-%m-%dT%H:%M:%S') + timedelta(seconds=float('.' + f))
  107. def __repr__(self):
  108. return 'Archive(%r)' % self.name
  109. def iter_items(self, filter=None):
  110. unpacker = msgpack.Unpacker()
  111. for id in self.metadata['items']:
  112. unpacker.feed(self.key.decrypt(id, self.store.get(id)))
  113. iter = ItemIter(unpacker, filter)
  114. for item in iter:
  115. yield item, iter.peek
  116. def add_item(self, item):
  117. self.items.write(msgpack.packb(item))
  118. now = time.time()
  119. if now - self.last_checkpoint > self.checkpoint_interval:
  120. self.last_checkpoint = now
  121. self.write_checkpoint()
  122. if self.items.tell() > ITEMS_BUFFER:
  123. self.flush_items()
  124. def flush_items(self, flush=False):
  125. if self.items.tell() == 0:
  126. return
  127. self.items.seek(0)
  128. chunks = list(str(s) for s in chunkify(self.items, CHUNK_SIZE, WINDOW_SIZE, self.key.chunk_seed))
  129. self.items.seek(0)
  130. self.items.truncate()
  131. for chunk in chunks[:-1]:
  132. id, _, _ = self.cache.add_chunk(self.key.id_hash(chunk), chunk, self.stats)
  133. self.items_ids.append(id)
  134. if flush or len(chunks) == 1:
  135. id, _, _ = self.cache.add_chunk(self.key.id_hash(chunks[-1]), chunks[-1], self.stats)
  136. self.items_ids.append(id)
  137. else:
  138. self.items.write(chunks[-1])
  139. def write_checkpoint(self):
  140. self.save(self.checkpoint_name)
  141. del self.manifest.archives[self.checkpoint_name]
  142. self.cache.chunk_decref(self.id)
  143. def save(self, name=None):
  144. name = name or self.name
  145. if name in self.manifest.archives:
  146. raise self.AlreadyExists(name)
  147. self.flush_items(flush=True)
  148. metadata = {
  149. 'version': 1,
  150. 'name': name,
  151. 'items': self.items_ids,
  152. 'cmdline': sys.argv,
  153. 'hostname': socket.gethostname(),
  154. 'username': getuser(),
  155. 'time': datetime.utcnow().isoformat(),
  156. }
  157. data = msgpack.packb(metadata)
  158. self.id = self.key.id_hash(data)
  159. self.cache.add_chunk(self.id, data, self.stats)
  160. self.manifest.archives[name] = {'id': self.id, 'time': metadata['time']}
  161. self.manifest.write()
  162. self.store.commit()
  163. self.cache.commit()
  164. def calc_stats(self, cache):
  165. # This function is a bit evil since it abuses the cache to calculate
  166. # the stats. The cache transaction must be rolled back afterwards
  167. unpacker = msgpack.Unpacker()
  168. cache.begin_txn()
  169. stats = Statistics()
  170. for id in self.metadata['items']:
  171. unpacker.feed(self.key.decrypt(id, self.store.get(id)))
  172. for item in unpacker:
  173. try:
  174. for id, size, csize in item['chunks']:
  175. count, _, _ = self.cache.chunks[id]
  176. stats.update(size, csize, count == 1)
  177. stats.nfiles += 1
  178. self.cache.chunks[id] = count - 1, size, csize
  179. except KeyError:
  180. pass
  181. count, size, csize = self.cache.chunks[id]
  182. stats.update(size, csize, count == 1)
  183. self.cache.chunks[id] = count - 1, size, csize
  184. cache.rollback()
  185. return stats
  186. def extract_item(self, item, dest=None, start_cb=None, restore_attrs=True, peek=None):
  187. dest = dest or self.cwd
  188. assert item['path'][0] not in ('/', '\\', ':')
  189. path = os.path.join(dest, encode_filename(item['path']))
  190. mode = item['mode']
  191. if stat.S_ISDIR(mode):
  192. if not os.path.exists(path):
  193. os.makedirs(path)
  194. if restore_attrs:
  195. self.restore_attrs(path, item)
  196. elif stat.S_ISREG(mode):
  197. if not os.path.exists(os.path.dirname(path)):
  198. os.makedirs(os.path.dirname(path))
  199. # Hard link?
  200. if 'source' in item:
  201. source = os.path.join(dest, item['source'])
  202. if os.path.exists(path):
  203. os.unlink(path)
  204. os.link(source, path)
  205. else:
  206. n = len(item['chunks'])
  207. ## 0 chunks indicates an empty (0 bytes) file
  208. if n == 0:
  209. open(path, 'wb').close()
  210. start_cb(item)
  211. else:
  212. fd = open(path, 'wb')
  213. start_cb(item)
  214. ids = [id for id, size, csize in item['chunks']]
  215. for id, chunk in izip(ids, self.store.get_many(ids, peek)):
  216. data = self.key.decrypt(id, chunk)
  217. fd.write(data)
  218. fd.close()
  219. self.restore_attrs(path, item)
  220. elif stat.S_ISFIFO(mode):
  221. if not os.path.exists(os.path.dirname(path)):
  222. os.makedirs(os.path.dirname(path))
  223. os.mkfifo(path)
  224. self.restore_attrs(path, item)
  225. elif stat.S_ISLNK(mode):
  226. if not os.path.exists(os.path.dirname(path)):
  227. os.makedirs(os.path.dirname(path))
  228. source = item['source']
  229. if os.path.exists(path):
  230. os.unlink(path)
  231. os.symlink(source, path)
  232. self.restore_attrs(path, item, symlink=True)
  233. elif stat.S_ISCHR(mode) or stat.S_ISBLK(mode):
  234. os.mknod(path, item['mode'], item['dev'])
  235. self.restore_attrs(path, item)
  236. else:
  237. raise Exception('Unknown archive item type %r' % item['mode'])
  238. def restore_attrs(self, path, item, symlink=False):
  239. xattrs = item.get('xattrs')
  240. if xattrs:
  241. xa = xattr(path, XATTR_NOFOLLOW)
  242. for k, v in xattrs.items():
  243. try:
  244. xa.set(k, v)
  245. except (IOError, KeyError):
  246. pass
  247. if have_lchmod:
  248. os.lchmod(path, item['mode'])
  249. elif not symlink:
  250. os.chmod(path, item['mode'])
  251. uid = gid = None
  252. if not self.numeric_owner:
  253. uid = user2uid(item['user'])
  254. gid = group2gid(item['group'])
  255. uid = uid or item['uid']
  256. gid = gid or item['gid']
  257. try:
  258. os.lchown(path, uid, gid)
  259. except OSError:
  260. pass
  261. if not symlink:
  262. # FIXME: We should really call futimes here (c extension required)
  263. os.utime(path, (item['mtime'], item['mtime']))
  264. def verify_file(self, item, start, result, peek=None):
  265. if not item['chunks']:
  266. start(item)
  267. result(item, True)
  268. else:
  269. start(item)
  270. ids = [id for id, size, csize in item['chunks']]
  271. try:
  272. for id, chunk in izip(ids, self.store.get_many(ids, peek)):
  273. if chunk:
  274. self.key.decrypt(id, chunk)
  275. except Exception, e:
  276. result(item, False)
  277. return
  278. result(item, True)
  279. def delete(self, cache):
  280. unpacker = msgpack.Unpacker()
  281. for id in self.metadata['items']:
  282. unpacker.feed(self.key.decrypt(id, self.store.get(id)))
  283. for item in unpacker:
  284. try:
  285. for chunk_id, size, csize in item['chunks']:
  286. self.cache.chunk_decref(chunk_id)
  287. except KeyError:
  288. pass
  289. self.cache.chunk_decref(id)
  290. self.cache.chunk_decref(self.id)
  291. del self.manifest.archives[self.name]
  292. self.manifest.write()
  293. self.store.commit()
  294. cache.commit()
  295. def stat_attrs(self, st, path):
  296. item = {
  297. 'mode': st.st_mode,
  298. 'uid': st.st_uid, 'user': uid2user(st.st_uid),
  299. 'gid': st.st_gid, 'group': gid2group(st.st_gid),
  300. 'mtime': st.st_mtime,
  301. }
  302. if self.numeric_owner:
  303. item['user'] = item['group'] = None
  304. try:
  305. xa = xattr(path, XATTR_NOFOLLOW)
  306. xattrs = {}
  307. for key in xa:
  308. # Only store the user namespace on Linux
  309. if linux and not key.startswith('user'):
  310. continue
  311. xattrs[key] = xa[key]
  312. if xattrs:
  313. item['xattrs'] = xattrs
  314. except IOError:
  315. pass
  316. return item
  317. def process_item(self, path, st):
  318. item = {'path': path.lstrip('/\\:')}
  319. item.update(self.stat_attrs(st, path))
  320. self.add_item(item)
  321. def process_dev(self, path, st):
  322. item = {'path': path.lstrip('/\\:'), 'dev': st.st_dev}
  323. item.update(self.stat_attrs(st, path))
  324. self.add_item(item)
  325. def process_symlink(self, path, st):
  326. source = os.readlink(path)
  327. item = {'path': path.lstrip('/\\:'), 'source': source}
  328. item.update(self.stat_attrs(st, path))
  329. self.add_item(item)
  330. def process_file(self, path, st, cache):
  331. safe_path = path.lstrip('/\\:')
  332. # Is it a hard link?
  333. if st.st_nlink > 1:
  334. source = self.hard_links.get((st.st_ino, st.st_dev))
  335. if (st.st_ino, st.st_dev) in self.hard_links:
  336. item = self.stat_attrs(st, path)
  337. item.update({'path': safe_path, 'source': source})
  338. self.add_item(item)
  339. return
  340. else:
  341. self.hard_links[st.st_ino, st.st_dev] = safe_path
  342. path_hash = self.key.id_hash(path)
  343. ids = cache.file_known_and_unchanged(path_hash, st)
  344. chunks = None
  345. if ids is not None:
  346. # Make sure all ids are available
  347. for id in ids:
  348. if not cache.seen_chunk(id):
  349. break
  350. else:
  351. chunks = [cache.chunk_incref(id, self.stats) for id in ids]
  352. # Only chunkify the file if needed
  353. if chunks is None:
  354. with open(path, 'rb') as fd:
  355. chunks = []
  356. for chunk in chunkify(fd, CHUNK_SIZE, WINDOW_SIZE,
  357. self.key.chunk_seed):
  358. chunks.append(cache.add_chunk(self.key.id_hash(chunk), chunk, self.stats))
  359. ids = [id for id, _, _ in chunks]
  360. cache.memorize_file(path_hash, st, ids)
  361. item = {'path': safe_path, 'chunks': chunks}
  362. item.update(self.stat_attrs(st, path))
  363. self.stats.nfiles += 1
  364. self.add_item(item)
  365. @staticmethod
  366. def list_archives(store, key, manifest, cache=None):
  367. for name, info in manifest.archives.items():
  368. yield Archive(store, key, manifest, name, cache=cache)