archive.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408
  1. from __future__ import with_statement
  2. from datetime import datetime, timedelta
  3. from getpass import getuser
  4. from itertools import izip_longest
  5. import msgpack
  6. import os
  7. import socket
  8. import stat
  9. import sys
  10. import time
  11. from cStringIO import StringIO
  12. from xattr import xattr, XATTR_NOFOLLOW
  13. from ._speedups import chunkify
  14. from .helpers import uid2user, user2uid, gid2group, group2gid, \
  15. encode_filename, Statistics
  16. ITEMS_BUFFER = 1024 * 1024
  17. CHUNK_SIZE = 64 * 1024
  18. WINDOW_SIZE = 4096
  19. have_lchmod = hasattr(os, 'lchmod')
  20. linux = sys.platform == 'linux2'
  21. class ItemIter(object):
  22. def __init__(self, unpacker, filter):
  23. self.unpacker = iter(unpacker)
  24. self.filter = filter
  25. self.stack = []
  26. self.peeks = 0
  27. self._peek = None
  28. self._peek_iter = None
  29. global foo
  30. foo = self
  31. def __iter__(self):
  32. return self
  33. def next(self):
  34. if self.stack:
  35. item = self.stack.pop(0)
  36. else:
  37. self._peek = None
  38. item = self.get_next()
  39. self.peeks = max(0, self.peeks - len(item.get('chunks', [])))
  40. return item
  41. def get_next(self):
  42. next = self.unpacker.next()
  43. while self.filter and not self.filter(next):
  44. next = self.unpacker.next()
  45. return next
  46. def peek(self):
  47. while True:
  48. while not self._peek or not self._peek_iter:
  49. if self.peeks > 100:
  50. raise StopIteration
  51. self._peek = self.get_next()
  52. self.stack.append(self._peek)
  53. if 'chunks' in self._peek:
  54. self._peek_iter = iter(self._peek['chunks'])
  55. else:
  56. self._peek_iter = None
  57. try:
  58. item = self._peek_iter.next()
  59. self.peeks += 1
  60. return item
  61. except StopIteration:
  62. self._peek = None
  63. class Archive(object):
  64. class DoesNotExist(Exception):
  65. pass
  66. class AlreadyExists(Exception):
  67. pass
  68. def __init__(self, store, key, manifest, name, cache=None, create=False,
  69. checkpoint_interval=300, numeric_owner=False):
  70. if sys.platform == 'darwin':
  71. self.cwd = os.getcwdu()
  72. else:
  73. self.cwd = os.getcwd()
  74. self.key = key
  75. self.store = store
  76. self.cache = cache
  77. self.manifest = manifest
  78. self.items = StringIO()
  79. self.items_ids = []
  80. self.hard_links = {}
  81. self.stats = Statistics()
  82. self.name = name
  83. self.checkpoint_interval = checkpoint_interval
  84. self.numeric_owner = numeric_owner
  85. if create:
  86. if name in manifest.archives:
  87. raise self.AlreadyExists
  88. self.last_checkpoint = time.time()
  89. i = 0
  90. while True:
  91. self.checkpoint_name = '%s.checkpoint%s' % (name, i and ('.%d' % i) or '')
  92. if not self.checkpoint_name in manifest.archives:
  93. break
  94. i += 1
  95. else:
  96. try:
  97. info = self.manifest.archives[name]
  98. except KeyError:
  99. raise self.DoesNotExist
  100. self.load(info['id'])
  101. def load(self, id):
  102. self.id = id
  103. data = self.key.decrypt(self.id, self.store.get(self.id))
  104. self.metadata = msgpack.unpackb(data)
  105. if self.metadata['version'] != 1:
  106. raise Exception('Unknown archive metadata version')
  107. self.name = self.metadata['name']
  108. @property
  109. def ts(self):
  110. """Timestamp of archive creation in UTC"""
  111. t, f = self.metadata['time'].split('.', 1)
  112. return datetime.strptime(t, '%Y-%m-%dT%H:%M:%S') + timedelta(seconds=float('.' + f))
  113. def __repr__(self):
  114. return 'Archive(%r)' % self.name
  115. def iter_items(self, filter=None):
  116. unpacker = msgpack.Unpacker()
  117. for id in self.metadata['items']:
  118. unpacker.feed(self.key.decrypt(id, self.store.get(id)))
  119. iter = ItemIter(unpacker, filter)
  120. for item in iter:
  121. yield item, iter.peek
  122. def add_item(self, item):
  123. self.items.write(msgpack.packb(item))
  124. now = time.time()
  125. if now - self.last_checkpoint > self.checkpoint_interval:
  126. self.last_checkpoint = now
  127. self.write_checkpoint()
  128. if self.items.tell() > ITEMS_BUFFER:
  129. self.flush_items()
  130. def flush_items(self, flush=False):
  131. if self.items.tell() == 0:
  132. return
  133. self.items.seek(0)
  134. chunks = list(str(s) for s in chunkify(self.items, CHUNK_SIZE, WINDOW_SIZE, self.key.chunk_seed))
  135. self.items.seek(0)
  136. self.items.truncate()
  137. for chunk in chunks[:-1]:
  138. id, _, _ = self.cache.add_chunk(self.key.id_hash(chunk), chunk, self.stats)
  139. self.items_ids.append(id)
  140. if flush or len(chunks) == 1:
  141. id, _, _ = self.cache.add_chunk(self.key.id_hash(chunks[-1]), chunks[-1], self.stats)
  142. self.items_ids.append(id)
  143. else:
  144. self.items.write(chunks[-1])
  145. def write_checkpoint(self):
  146. self.save(self.checkpoint_name)
  147. del self.manifest.archives[self.checkpoint_name]
  148. self.cache.chunk_decref(self.id)
  149. def save(self, name=None):
  150. name = name or self.name
  151. if name in self.manifest.archives:
  152. raise self.AlreadyExists(name)
  153. self.flush_items(flush=True)
  154. metadata = {
  155. 'version': 1,
  156. 'name': name,
  157. 'items': self.items_ids,
  158. 'cmdline': sys.argv,
  159. 'hostname': socket.gethostname(),
  160. 'username': getuser(),
  161. 'time': datetime.utcnow().isoformat(),
  162. }
  163. data = msgpack.packb(metadata)
  164. self.id = self.key.id_hash(data)
  165. self.cache.add_chunk(self.id, data, self.stats)
  166. self.manifest.archives[name] = {'id': self.id, 'time': metadata['time']}
  167. self.manifest.write()
  168. self.store.commit()
  169. self.cache.commit()
  170. def calc_stats(self, cache):
  171. def add(id):
  172. count, size, csize = self.cache.chunks[id]
  173. stats.update(size, csize, count == 1)
  174. self.cache.chunks[id] = count - 1, size, csize
  175. # This function is a bit evil since it abuses the cache to calculate
  176. # the stats. The cache transaction must be rolled back afterwards
  177. unpacker = msgpack.Unpacker()
  178. cache.begin_txn()
  179. stats = Statistics()
  180. add(self.id)
  181. for id, chunk in izip_longest(self.metadata['items'], self.store.get_many(self.metadata['items'])):
  182. unpacker.feed(self.key.decrypt(id, chunk))
  183. for item in unpacker:
  184. try:
  185. for id, size, csize in item['chunks']:
  186. add(id)
  187. stats.nfiles += 1
  188. except KeyError:
  189. pass
  190. add(id)
  191. cache.rollback()
  192. return stats
  193. def extract_item(self, item, dest=None, start_cb=None, restore_attrs=True, peek=None):
  194. dest = dest or self.cwd
  195. assert item['path'][0] not in ('/', '\\', ':')
  196. path = os.path.join(dest, encode_filename(item['path']))
  197. mode = item['mode']
  198. if stat.S_ISDIR(mode):
  199. if not os.path.exists(path):
  200. os.makedirs(path)
  201. if restore_attrs:
  202. self.restore_attrs(path, item)
  203. elif stat.S_ISREG(mode):
  204. if not os.path.exists(os.path.dirname(path)):
  205. os.makedirs(os.path.dirname(path))
  206. # Hard link?
  207. if 'source' in item:
  208. source = os.path.join(dest, item['source'])
  209. if os.path.exists(path):
  210. os.unlink(path)
  211. os.link(source, path)
  212. else:
  213. n = len(item['chunks'])
  214. ## 0 chunks indicates an empty (0 bytes) file
  215. if n == 0:
  216. open(path, 'wb').close()
  217. start_cb(item)
  218. else:
  219. fd = open(path, 'wb')
  220. start_cb(item)
  221. ids = [id for id, size, csize in item['chunks']]
  222. for id, chunk in izip_longest(ids, self.store.get_many(ids, peek)):
  223. data = self.key.decrypt(id, chunk)
  224. fd.write(data)
  225. fd.close()
  226. self.restore_attrs(path, item)
  227. elif stat.S_ISFIFO(mode):
  228. if not os.path.exists(os.path.dirname(path)):
  229. os.makedirs(os.path.dirname(path))
  230. os.mkfifo(path)
  231. self.restore_attrs(path, item)
  232. elif stat.S_ISLNK(mode):
  233. if not os.path.exists(os.path.dirname(path)):
  234. os.makedirs(os.path.dirname(path))
  235. source = item['source']
  236. if os.path.exists(path):
  237. os.unlink(path)
  238. os.symlink(source, path)
  239. self.restore_attrs(path, item, symlink=True)
  240. elif stat.S_ISCHR(mode) or stat.S_ISBLK(mode):
  241. os.mknod(path, item['mode'], item['dev'])
  242. self.restore_attrs(path, item)
  243. else:
  244. raise Exception('Unknown archive item type %r' % item['mode'])
  245. def restore_attrs(self, path, item, symlink=False):
  246. xattrs = item.get('xattrs')
  247. if xattrs:
  248. xa = xattr(path, XATTR_NOFOLLOW)
  249. for k, v in xattrs.items():
  250. try:
  251. xa.set(k, v)
  252. except (IOError, KeyError):
  253. pass
  254. if have_lchmod:
  255. os.lchmod(path, item['mode'])
  256. elif not symlink:
  257. os.chmod(path, item['mode'])
  258. uid = gid = None
  259. if not self.numeric_owner:
  260. uid = user2uid(item['user'])
  261. gid = group2gid(item['group'])
  262. uid = uid or item['uid']
  263. gid = gid or item['gid']
  264. try:
  265. os.lchown(path, uid, gid)
  266. except OSError:
  267. pass
  268. if not symlink:
  269. # FIXME: We should really call futimes here (c extension required)
  270. os.utime(path, (item['mtime'], item['mtime']))
  271. def verify_file(self, item, start, result, peek=None):
  272. if not item['chunks']:
  273. start(item)
  274. result(item, True)
  275. else:
  276. start(item)
  277. ids = [id for id, size, csize in item['chunks']]
  278. try:
  279. for id, chunk in izip_longest(ids, self.store.get_many(ids, peek)):
  280. self.key.decrypt(id, chunk)
  281. except Exception, e:
  282. result(item, False)
  283. return
  284. result(item, True)
  285. def delete(self, cache):
  286. unpacker = msgpack.Unpacker()
  287. for id in self.metadata['items']:
  288. unpacker.feed(self.key.decrypt(id, self.store.get(id)))
  289. for item in unpacker:
  290. try:
  291. for chunk_id, size, csize in item['chunks']:
  292. self.cache.chunk_decref(chunk_id)
  293. except KeyError:
  294. pass
  295. self.cache.chunk_decref(id)
  296. self.cache.chunk_decref(self.id)
  297. del self.manifest.archives[self.name]
  298. self.manifest.write()
  299. self.store.commit()
  300. cache.commit()
  301. def stat_attrs(self, st, path):
  302. item = {
  303. 'mode': st.st_mode,
  304. 'uid': st.st_uid, 'user': uid2user(st.st_uid),
  305. 'gid': st.st_gid, 'group': gid2group(st.st_gid),
  306. 'mtime': st.st_mtime,
  307. }
  308. if self.numeric_owner:
  309. item['user'] = item['group'] = None
  310. try:
  311. xa = xattr(path, XATTR_NOFOLLOW)
  312. xattrs = {}
  313. for key in xa:
  314. # Only store the user namespace on Linux
  315. if linux and not key.startswith('user'):
  316. continue
  317. xattrs[key] = xa[key]
  318. if xattrs:
  319. item['xattrs'] = xattrs
  320. except IOError:
  321. pass
  322. return item
  323. def process_item(self, path, st):
  324. item = {'path': path.lstrip('/\\:')}
  325. item.update(self.stat_attrs(st, path))
  326. self.add_item(item)
  327. def process_dev(self, path, st):
  328. item = {'path': path.lstrip('/\\:'), 'dev': st.st_dev}
  329. item.update(self.stat_attrs(st, path))
  330. self.add_item(item)
  331. def process_symlink(self, path, st):
  332. source = os.readlink(path)
  333. item = {'path': path.lstrip('/\\:'), 'source': source}
  334. item.update(self.stat_attrs(st, path))
  335. self.add_item(item)
  336. def process_file(self, path, st, cache):
  337. safe_path = path.lstrip('/\\:')
  338. # Is it a hard link?
  339. if st.st_nlink > 1:
  340. source = self.hard_links.get((st.st_ino, st.st_dev))
  341. if (st.st_ino, st.st_dev) in self.hard_links:
  342. item = self.stat_attrs(st, path)
  343. item.update({'path': safe_path, 'source': source})
  344. self.add_item(item)
  345. return
  346. else:
  347. self.hard_links[st.st_ino, st.st_dev] = safe_path
  348. path_hash = self.key.id_hash(path)
  349. ids = cache.file_known_and_unchanged(path_hash, st)
  350. chunks = None
  351. if ids is not None:
  352. # Make sure all ids are available
  353. for id in ids:
  354. if not cache.seen_chunk(id):
  355. break
  356. else:
  357. chunks = [cache.chunk_incref(id, self.stats) for id in ids]
  358. # Only chunkify the file if needed
  359. if chunks is None:
  360. with open(path, 'rb') as fd:
  361. chunks = []
  362. for chunk in chunkify(fd, CHUNK_SIZE, WINDOW_SIZE,
  363. self.key.chunk_seed):
  364. chunks.append(cache.add_chunk(self.key.id_hash(chunk), chunk, self.stats))
  365. ids = [id for id, _, _ in chunks]
  366. cache.memorize_file(path_hash, st, ids)
  367. item = {'path': safe_path, 'chunks': chunks}
  368. item.update(self.stat_attrs(st, path))
  369. self.stats.nfiles += 1
  370. self.add_item(item)
  371. @staticmethod
  372. def list_archives(store, key, manifest, cache=None):
  373. for name, info in manifest.archives.items():
  374. yield Archive(store, key, manifest, name, cache=cache)