archive.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415
  1. from __future__ import with_statement
  2. from datetime import datetime, timedelta
  3. from getpass import getuser
  4. from itertools import izip_longest
  5. import msgpack
  6. import os
  7. import socket
  8. import stat
  9. import sys
  10. import time
  11. from cStringIO import StringIO
  12. from xattr import xattr, XATTR_NOFOLLOW
  13. from ._speedups import chunkify
  14. from .helpers import uid2user, user2uid, gid2group, group2gid, \
  15. encode_filename, Statistics
  16. ITEMS_BUFFER = 1024 * 1024
  17. CHUNK_SIZE = 64 * 1024
  18. WINDOW_SIZE = 4096
  19. have_lchmod = hasattr(os, 'lchmod')
  20. linux = sys.platform == 'linux2'
  21. class ItemIter(object):
  22. def __init__(self, unpacker, filter):
  23. self.unpacker = iter(unpacker)
  24. self.filter = filter
  25. self.stack = []
  26. self.peeks = 0
  27. self._peek = None
  28. self._peek_iter = None
  29. global foo
  30. foo = self
  31. def __iter__(self):
  32. return self
  33. def next(self):
  34. if self.stack:
  35. item = self.stack.pop(0)
  36. else:
  37. self._peek = None
  38. item = self.get_next()
  39. self.peeks = max(0, self.peeks - len(item.get('chunks', [])))
  40. return item
  41. def get_next(self):
  42. next = self.unpacker.next()
  43. while self.filter and not self.filter(next):
  44. next = self.unpacker.next()
  45. return next
  46. def peek(self):
  47. while True:
  48. while not self._peek or not self._peek_iter:
  49. if self.peeks > 100:
  50. raise StopIteration
  51. self._peek = self.get_next()
  52. self.stack.append(self._peek)
  53. if 'chunks' in self._peek:
  54. self._peek_iter = iter(self._peek['chunks'])
  55. else:
  56. self._peek_iter = None
  57. try:
  58. item = self._peek_iter.next()
  59. self.peeks += 1
  60. return item
  61. except StopIteration:
  62. self._peek = None
  63. class Archive(object):
  64. class DoesNotExist(Exception):
  65. pass
  66. class AlreadyExists(Exception):
  67. pass
  68. def __init__(self, store, key, manifest, name, cache=None, create=False,
  69. checkpoint_interval=300, numeric_owner=False):
  70. if sys.platform == 'darwin':
  71. self.cwd = os.getcwdu()
  72. else:
  73. self.cwd = os.getcwd()
  74. self.key = key
  75. self.store = store
  76. self.cache = cache
  77. self.manifest = manifest
  78. self.items = StringIO()
  79. self.items_ids = []
  80. self.hard_links = {}
  81. self.stats = Statistics()
  82. self.name = name
  83. self.checkpoint_interval = checkpoint_interval
  84. self.numeric_owner = numeric_owner
  85. if create:
  86. if name in manifest.archives:
  87. raise self.AlreadyExists(name)
  88. self.last_checkpoint = time.time()
  89. i = 0
  90. while True:
  91. self.checkpoint_name = '%s.checkpoint%s' % (name, i and ('.%d' % i) or '')
  92. if not self.checkpoint_name in manifest.archives:
  93. break
  94. i += 1
  95. else:
  96. if name not in self.manifest.archives:
  97. raise self.DoesNotExist(name)
  98. info = self.manifest.archives[name]
  99. self.load(info['id'])
  100. def load(self, id):
  101. self.id = id
  102. data = self.key.decrypt(self.id, self.store.get(self.id))
  103. self.metadata = msgpack.unpackb(data)
  104. if self.metadata['version'] != 1:
  105. raise Exception('Unknown archive metadata version')
  106. self.name = self.metadata['name']
  107. @property
  108. def ts(self):
  109. """Timestamp of archive creation in UTC"""
  110. t, f = self.metadata['time'].split('.', 1)
  111. return datetime.strptime(t, '%Y-%m-%dT%H:%M:%S') + timedelta(seconds=float('.' + f))
  112. def __repr__(self):
  113. return 'Archive(%r)' % self.name
  114. def iter_items(self, filter=None):
  115. unpacker = msgpack.Unpacker()
  116. i = 0
  117. n = 20
  118. while True:
  119. items = self.metadata['items'][i:i + n]
  120. i += n
  121. if not items:
  122. break
  123. for id, chunk in [(id, chunk) for id, chunk in izip_longest(items, self.store.get_many(items))]:
  124. unpacker.feed(self.key.decrypt(id, chunk))
  125. iter = ItemIter(unpacker, filter)
  126. for item in iter:
  127. yield item, iter.peek
  128. def add_item(self, item):
  129. self.items.write(msgpack.packb(item))
  130. now = time.time()
  131. if now - self.last_checkpoint > self.checkpoint_interval:
  132. self.last_checkpoint = now
  133. self.write_checkpoint()
  134. if self.items.tell() > ITEMS_BUFFER:
  135. self.flush_items()
  136. def flush_items(self, flush=False):
  137. if self.items.tell() == 0:
  138. return
  139. self.items.seek(0)
  140. chunks = list(str(s) for s in chunkify(self.items, CHUNK_SIZE, WINDOW_SIZE, self.key.chunk_seed))
  141. self.items.seek(0)
  142. self.items.truncate()
  143. for chunk in chunks[:-1]:
  144. id, _, _ = self.cache.add_chunk(self.key.id_hash(chunk), chunk, self.stats)
  145. self.items_ids.append(id)
  146. if flush or len(chunks) == 1:
  147. id, _, _ = self.cache.add_chunk(self.key.id_hash(chunks[-1]), chunks[-1], self.stats)
  148. self.items_ids.append(id)
  149. else:
  150. self.items.write(chunks[-1])
  151. def write_checkpoint(self):
  152. self.save(self.checkpoint_name)
  153. del self.manifest.archives[self.checkpoint_name]
  154. self.cache.chunk_decref(self.id)
  155. def save(self, name=None):
  156. name = name or self.name
  157. if name in self.manifest.archives:
  158. raise self.AlreadyExists(name)
  159. self.flush_items(flush=True)
  160. metadata = {
  161. 'version': 1,
  162. 'name': name,
  163. 'items': self.items_ids,
  164. 'cmdline': sys.argv,
  165. 'hostname': socket.gethostname(),
  166. 'username': getuser(),
  167. 'time': datetime.utcnow().isoformat(),
  168. }
  169. data = msgpack.packb(metadata)
  170. self.id = self.key.id_hash(data)
  171. self.cache.add_chunk(self.id, data, self.stats)
  172. self.manifest.archives[name] = {'id': self.id, 'time': metadata['time']}
  173. self.manifest.write()
  174. self.store.commit()
  175. self.cache.commit()
  176. def calc_stats(self, cache):
  177. def add(id):
  178. count, size, csize = self.cache.chunks[id]
  179. stats.update(size, csize, count == 1)
  180. self.cache.chunks[id] = count - 1, size, csize
  181. # This function is a bit evil since it abuses the cache to calculate
  182. # the stats. The cache transaction must be rolled back afterwards
  183. unpacker = msgpack.Unpacker()
  184. cache.begin_txn()
  185. stats = Statistics()
  186. add(self.id)
  187. for id, chunk in izip_longest(self.metadata['items'], self.store.get_many(self.metadata['items'])):
  188. add(id)
  189. unpacker.feed(self.key.decrypt(id, chunk))
  190. for item in unpacker:
  191. try:
  192. for id, size, csize in item['chunks']:
  193. add(id)
  194. stats.nfiles += 1
  195. except KeyError:
  196. pass
  197. cache.rollback()
  198. return stats
  199. def extract_item(self, item, dest=None, restore_attrs=True, peek=None):
  200. dest = dest or self.cwd
  201. assert item['path'][0] not in ('/', '\\', ':')
  202. path = os.path.join(dest, encode_filename(item['path']))
  203. # Attempt to remove existing files, ignore errors on failure
  204. try:
  205. st = os.lstat(path)
  206. if stat.S_ISDIR(st.st_mode):
  207. os.rmdir(path)
  208. else:
  209. os.unlink(path)
  210. except OSError:
  211. pass
  212. mode = item['mode']
  213. if stat.S_ISDIR(mode):
  214. if not os.path.exists(path):
  215. os.makedirs(path)
  216. if restore_attrs:
  217. self.restore_attrs(path, item)
  218. elif stat.S_ISREG(mode):
  219. if not os.path.exists(os.path.dirname(path)):
  220. os.makedirs(os.path.dirname(path))
  221. # Hard link?
  222. if 'source' in item:
  223. source = os.path.join(dest, item['source'])
  224. if os.path.exists(path):
  225. os.unlink(path)
  226. os.link(source, path)
  227. else:
  228. with open(path, 'wbx') as fd:
  229. ids = [id for id, size, csize in item['chunks']]
  230. for id, chunk in izip_longest(ids, self.store.get_many(ids, peek)):
  231. data = self.key.decrypt(id, chunk)
  232. fd.write(data)
  233. self.restore_attrs(path, item)
  234. elif stat.S_ISFIFO(mode):
  235. if not os.path.exists(os.path.dirname(path)):
  236. os.makedirs(os.path.dirname(path))
  237. os.mkfifo(path)
  238. self.restore_attrs(path, item)
  239. elif stat.S_ISLNK(mode):
  240. if not os.path.exists(os.path.dirname(path)):
  241. os.makedirs(os.path.dirname(path))
  242. source = item['source']
  243. if os.path.exists(path):
  244. os.unlink(path)
  245. os.symlink(source, path)
  246. self.restore_attrs(path, item, symlink=True)
  247. elif stat.S_ISCHR(mode) or stat.S_ISBLK(mode):
  248. os.mknod(path, item['mode'], item['rdev'])
  249. self.restore_attrs(path, item)
  250. else:
  251. raise Exception('Unknown archive item type %r' % item['mode'])
  252. def restore_attrs(self, path, item, symlink=False):
  253. xattrs = item.get('xattrs')
  254. if xattrs:
  255. xa = xattr(path, XATTR_NOFOLLOW)
  256. for k, v in xattrs.items():
  257. try:
  258. xa.set(k, v)
  259. except (IOError, KeyError):
  260. pass
  261. if have_lchmod:
  262. os.lchmod(path, item['mode'])
  263. elif not symlink:
  264. os.chmod(path, item['mode'])
  265. uid = gid = None
  266. if not self.numeric_owner:
  267. uid = user2uid(item['user'])
  268. gid = group2gid(item['group'])
  269. uid = uid or item['uid']
  270. gid = gid or item['gid']
  271. try:
  272. os.lchown(path, uid, gid)
  273. except OSError:
  274. pass
  275. if not symlink:
  276. # FIXME: We should really call futimes here (c extension required)
  277. os.utime(path, (item['mtime'], item['mtime']))
  278. def verify_file(self, item, start, result, peek=None):
  279. if not item['chunks']:
  280. start(item)
  281. result(item, True)
  282. else:
  283. start(item)
  284. ids = [id for id, size, csize in item['chunks']]
  285. try:
  286. for id, chunk in izip_longest(ids, self.store.get_many(ids, peek)):
  287. self.key.decrypt(id, chunk)
  288. except Exception:
  289. result(item, False)
  290. return
  291. result(item, True)
  292. def delete(self, cache):
  293. unpacker = msgpack.Unpacker()
  294. for id in self.metadata['items']:
  295. unpacker.feed(self.key.decrypt(id, self.store.get(id)))
  296. for item in unpacker:
  297. try:
  298. for chunk_id, size, csize in item['chunks']:
  299. self.cache.chunk_decref(chunk_id)
  300. except KeyError:
  301. pass
  302. self.cache.chunk_decref(id)
  303. self.cache.chunk_decref(self.id)
  304. del self.manifest.archives[self.name]
  305. self.manifest.write()
  306. self.store.commit()
  307. cache.commit()
  308. def stat_attrs(self, st, path):
  309. item = {
  310. 'mode': st.st_mode,
  311. 'uid': st.st_uid, 'user': uid2user(st.st_uid),
  312. 'gid': st.st_gid, 'group': gid2group(st.st_gid),
  313. 'mtime': st.st_mtime,
  314. }
  315. if self.numeric_owner:
  316. item['user'] = item['group'] = None
  317. try:
  318. xa = xattr(path, XATTR_NOFOLLOW)
  319. xattrs = {}
  320. for key in xa:
  321. # Only store the user namespace on Linux
  322. if linux and not key.startswith('user'):
  323. continue
  324. xattrs[key] = xa[key]
  325. if xattrs:
  326. item['xattrs'] = xattrs
  327. except IOError:
  328. pass
  329. return item
  330. def process_item(self, path, st):
  331. item = {'path': path.lstrip('/\\:')}
  332. item.update(self.stat_attrs(st, path))
  333. self.add_item(item)
  334. def process_dev(self, path, st):
  335. item = {'path': path.lstrip('/\\:'), 'rdev': st.st_rdev}
  336. item.update(self.stat_attrs(st, path))
  337. self.add_item(item)
  338. def process_symlink(self, path, st):
  339. source = os.readlink(path)
  340. item = {'path': path.lstrip('/\\:'), 'source': source}
  341. item.update(self.stat_attrs(st, path))
  342. self.add_item(item)
  343. def process_file(self, path, st, cache):
  344. safe_path = path.lstrip('/\\:')
  345. # Is it a hard link?
  346. if st.st_nlink > 1:
  347. source = self.hard_links.get((st.st_ino, st.st_dev))
  348. if (st.st_ino, st.st_dev) in self.hard_links:
  349. item = self.stat_attrs(st, path)
  350. item.update({'path': safe_path, 'source': source})
  351. self.add_item(item)
  352. return
  353. else:
  354. self.hard_links[st.st_ino, st.st_dev] = safe_path
  355. path_hash = self.key.id_hash(path)
  356. ids = cache.file_known_and_unchanged(path_hash, st)
  357. chunks = None
  358. if ids is not None:
  359. # Make sure all ids are available
  360. for id in ids:
  361. if not cache.seen_chunk(id):
  362. break
  363. else:
  364. chunks = [cache.chunk_incref(id, self.stats) for id in ids]
  365. # Only chunkify the file if needed
  366. if chunks is None:
  367. with open(path, 'rb') as fd:
  368. chunks = []
  369. for chunk in chunkify(fd, CHUNK_SIZE, WINDOW_SIZE,
  370. self.key.chunk_seed):
  371. chunks.append(cache.add_chunk(self.key.id_hash(chunk), chunk, self.stats))
  372. ids = [id for id, _, _ in chunks]
  373. cache.memorize_file(path_hash, st, ids)
  374. item = {'path': safe_path, 'chunks': chunks}
  375. item.update(self.stat_attrs(st, path))
  376. self.stats.nfiles += 1
  377. self.add_item(item)
  378. @staticmethod
  379. def list_archives(store, key, manifest, cache=None):
  380. for name, info in manifest.archives.items():
  381. yield Archive(store, key, manifest, name, cache=cache)