archive.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415
  1. from __future__ import with_statement
  2. from datetime import datetime, timedelta
  3. from getpass import getuser
  4. from itertools import izip_longest
  5. import msgpack
  6. import os
  7. import socket
  8. import stat
  9. import sys
  10. import time
  11. from cStringIO import StringIO
  12. from xattr import xattr, XATTR_NOFOLLOW
  13. from .chunker import chunkify
  14. from .helpers import uid2user, user2uid, gid2group, group2gid, \
  15. encode_filename, Statistics
  16. ITEMS_BUFFER = 1024 * 1024
  17. CHUNK_MIN = 1024
  18. WINDOW_SIZE = 0xfff
  19. CHUNK_MASK = 0xffff
  20. have_lchmod = hasattr(os, 'lchmod')
  21. linux = sys.platform == 'linux2'
  22. class ItemIter(object):
  23. def __init__(self, unpacker, filter):
  24. self.unpacker = iter(unpacker)
  25. self.filter = filter
  26. self.stack = []
  27. self.peeks = 0
  28. self._peek = None
  29. self._peek_iter = None
  30. global foo
  31. foo = self
  32. def __iter__(self):
  33. return self
  34. def next(self):
  35. if self.stack:
  36. item = self.stack.pop(0)
  37. else:
  38. self._peek = None
  39. item = self.get_next()
  40. self.peeks = max(0, self.peeks - len(item.get('chunks', [])))
  41. return item
  42. def get_next(self):
  43. next = self.unpacker.next()
  44. while self.filter and not self.filter(next):
  45. next = self.unpacker.next()
  46. return next
  47. def peek(self):
  48. while True:
  49. while not self._peek or not self._peek_iter:
  50. if self.peeks > 100:
  51. raise StopIteration
  52. self._peek = self.get_next()
  53. self.stack.append(self._peek)
  54. if 'chunks' in self._peek:
  55. self._peek_iter = iter(self._peek['chunks'])
  56. else:
  57. self._peek_iter = None
  58. try:
  59. item = self._peek_iter.next()
  60. self.peeks += 1
  61. return item
  62. except StopIteration:
  63. self._peek = None
  64. class Archive(object):
  65. class DoesNotExist(Exception):
  66. pass
  67. class AlreadyExists(Exception):
  68. pass
  69. def __init__(self, store, key, manifest, name, cache=None, create=False,
  70. checkpoint_interval=300, numeric_owner=False):
  71. if sys.platform == 'darwin':
  72. self.cwd = os.getcwdu()
  73. else:
  74. self.cwd = os.getcwd()
  75. self.key = key
  76. self.store = store
  77. self.cache = cache
  78. self.manifest = manifest
  79. self.items = StringIO()
  80. self.items_ids = []
  81. self.hard_links = {}
  82. self.stats = Statistics()
  83. self.name = name
  84. self.checkpoint_interval = checkpoint_interval
  85. self.numeric_owner = numeric_owner
  86. if create:
  87. if name in manifest.archives:
  88. raise self.AlreadyExists(name)
  89. self.last_checkpoint = time.time()
  90. i = 0
  91. while True:
  92. self.checkpoint_name = '%s.checkpoint%s' % (name, i and ('.%d' % i) or '')
  93. if not self.checkpoint_name in manifest.archives:
  94. break
  95. i += 1
  96. else:
  97. if name not in self.manifest.archives:
  98. raise self.DoesNotExist(name)
  99. info = self.manifest.archives[name]
  100. self.load(info['id'])
  101. def load(self, id):
  102. self.id = id
  103. data = self.key.decrypt(self.id, self.store.get(self.id))
  104. self.metadata = msgpack.unpackb(data)
  105. if self.metadata['version'] != 1:
  106. raise Exception('Unknown archive metadata version')
  107. self.name = self.metadata['name']
  108. @property
  109. def ts(self):
  110. """Timestamp of archive creation in UTC"""
  111. t, f = self.metadata['time'].split('.', 1)
  112. return datetime.strptime(t, '%Y-%m-%dT%H:%M:%S') + timedelta(seconds=float('.' + f))
  113. def __repr__(self):
  114. return 'Archive(%r)' % self.name
  115. def iter_items(self, filter=None):
  116. unpacker = msgpack.Unpacker(use_list=False)
  117. i = 0
  118. n = 20
  119. while True:
  120. items = self.metadata['items'][i:i + n]
  121. i += n
  122. if not items:
  123. break
  124. for id, chunk in [(id, chunk) for id, chunk in izip_longest(items, self.store.get_many(items))]:
  125. unpacker.feed(self.key.decrypt(id, chunk))
  126. iter = ItemIter(unpacker, filter)
  127. for item in iter:
  128. yield item, iter.peek
  129. def add_item(self, item):
  130. self.items.write(msgpack.packb(item))
  131. now = time.time()
  132. if now - self.last_checkpoint > self.checkpoint_interval:
  133. self.last_checkpoint = now
  134. self.write_checkpoint()
  135. if self.items.tell() > ITEMS_BUFFER:
  136. self.flush_items()
  137. def flush_items(self, flush=False):
  138. if self.items.tell() == 0:
  139. return
  140. self.items.seek(0)
  141. chunks = list(str(s) for s in chunkify(self.items, WINDOW_SIZE, CHUNK_MASK, CHUNK_MIN, self.key.chunk_seed))
  142. self.items.seek(0)
  143. self.items.truncate()
  144. for chunk in chunks[:-1]:
  145. id, _, _ = self.cache.add_chunk(self.key.id_hash(chunk), chunk, self.stats)
  146. self.items_ids.append(id)
  147. if flush or len(chunks) == 1:
  148. id, _, _ = self.cache.add_chunk(self.key.id_hash(chunks[-1]), chunks[-1], self.stats)
  149. self.items_ids.append(id)
  150. else:
  151. self.items.write(chunks[-1])
  152. def write_checkpoint(self):
  153. self.save(self.checkpoint_name)
  154. del self.manifest.archives[self.checkpoint_name]
  155. self.cache.chunk_decref(self.id)
  156. def save(self, name=None):
  157. name = name or self.name
  158. if name in self.manifest.archives:
  159. raise self.AlreadyExists(name)
  160. self.flush_items(flush=True)
  161. metadata = {
  162. 'version': 1,
  163. 'name': name,
  164. 'items': self.items_ids,
  165. 'cmdline': sys.argv,
  166. 'hostname': socket.gethostname(),
  167. 'username': getuser(),
  168. 'time': datetime.utcnow().isoformat(),
  169. }
  170. data = msgpack.packb(metadata)
  171. self.id = self.key.id_hash(data)
  172. self.cache.add_chunk(self.id, data, self.stats)
  173. self.manifest.archives[name] = {'id': self.id, 'time': metadata['time']}
  174. self.manifest.write()
  175. self.store.commit()
  176. self.cache.commit()
  177. def calc_stats(self, cache):
  178. def add(id):
  179. count, size, csize = self.cache.chunks[id]
  180. stats.update(size, csize, count == 1)
  181. self.cache.chunks[id] = count - 1, size, csize
  182. # This function is a bit evil since it abuses the cache to calculate
  183. # the stats. The cache transaction must be rolled back afterwards
  184. unpacker = msgpack.Unpacker(use_list=False)
  185. cache.begin_txn()
  186. stats = Statistics()
  187. add(self.id)
  188. for id, chunk in izip_longest(self.metadata['items'], self.store.get_many(self.metadata['items'])):
  189. add(id)
  190. unpacker.feed(self.key.decrypt(id, chunk))
  191. for item in unpacker:
  192. try:
  193. for id, size, csize in item['chunks']:
  194. add(id)
  195. stats.nfiles += 1
  196. except KeyError:
  197. pass
  198. cache.rollback()
  199. return stats
  200. def extract_item(self, item, dest=None, restore_attrs=True, peek=None):
  201. dest = dest or self.cwd
  202. assert item['path'][0] not in ('/', '\\', ':')
  203. path = os.path.join(dest, encode_filename(item['path']))
  204. # Attempt to remove existing files, ignore errors on failure
  205. try:
  206. st = os.lstat(path)
  207. if stat.S_ISDIR(st.st_mode):
  208. os.rmdir(path)
  209. else:
  210. os.unlink(path)
  211. except OSError:
  212. pass
  213. mode = item['mode']
  214. if stat.S_ISDIR(mode):
  215. if not os.path.exists(path):
  216. os.makedirs(path)
  217. if restore_attrs:
  218. self.restore_attrs(path, item)
  219. elif stat.S_ISREG(mode):
  220. if not os.path.exists(os.path.dirname(path)):
  221. os.makedirs(os.path.dirname(path))
  222. # Hard link?
  223. if 'source' in item:
  224. source = os.path.join(dest, item['source'])
  225. if os.path.exists(path):
  226. os.unlink(path)
  227. os.link(source, path)
  228. else:
  229. with open(path, 'wbx') as fd:
  230. ids = [id for id, size, csize in item['chunks']]
  231. for id, chunk in izip_longest(ids, self.store.get_many(ids, peek)):
  232. data = self.key.decrypt(id, chunk)
  233. fd.write(data)
  234. self.restore_attrs(path, item)
  235. elif stat.S_ISFIFO(mode):
  236. if not os.path.exists(os.path.dirname(path)):
  237. os.makedirs(os.path.dirname(path))
  238. os.mkfifo(path)
  239. self.restore_attrs(path, item)
  240. elif stat.S_ISLNK(mode):
  241. if not os.path.exists(os.path.dirname(path)):
  242. os.makedirs(os.path.dirname(path))
  243. source = item['source']
  244. if os.path.exists(path):
  245. os.unlink(path)
  246. os.symlink(source, path)
  247. self.restore_attrs(path, item, symlink=True)
  248. elif stat.S_ISCHR(mode) or stat.S_ISBLK(mode):
  249. os.mknod(path, item['mode'], item['rdev'])
  250. self.restore_attrs(path, item)
  251. else:
  252. raise Exception('Unknown archive item type %r' % item['mode'])
  253. def restore_attrs(self, path, item, symlink=False):
  254. xattrs = item.get('xattrs')
  255. if xattrs:
  256. xa = xattr(path, XATTR_NOFOLLOW)
  257. for k, v in xattrs.items():
  258. try:
  259. xa.set(k, v)
  260. except (IOError, KeyError):
  261. pass
  262. uid = gid = None
  263. if not self.numeric_owner:
  264. uid = user2uid(item['user'])
  265. gid = group2gid(item['group'])
  266. uid = uid or item['uid']
  267. gid = gid or item['gid']
  268. try:
  269. os.lchown(path, uid, gid)
  270. except OSError:
  271. pass
  272. if have_lchmod:
  273. os.lchmod(path, item['mode'])
  274. elif not symlink:
  275. os.chmod(path, item['mode'])
  276. if not symlink:
  277. # FIXME: We should really call futimes here (c extension required)
  278. os.utime(path, (item['mtime'], item['mtime']))
  279. def verify_file(self, item, start, result, peek=None):
  280. if not item['chunks']:
  281. start(item)
  282. result(item, True)
  283. else:
  284. start(item)
  285. ids = [id for id, size, csize in item['chunks']]
  286. try:
  287. for id, chunk in izip_longest(ids, self.store.get_many(ids, peek)):
  288. self.key.decrypt(id, chunk)
  289. except Exception:
  290. result(item, False)
  291. return
  292. result(item, True)
  293. def delete(self, cache):
  294. unpacker = msgpack.Unpacker(use_list=False)
  295. for id in self.metadata['items']:
  296. unpacker.feed(self.key.decrypt(id, self.store.get(id)))
  297. for item in unpacker:
  298. try:
  299. for chunk_id, size, csize in item['chunks']:
  300. self.cache.chunk_decref(chunk_id)
  301. except KeyError:
  302. pass
  303. self.cache.chunk_decref(id)
  304. self.cache.chunk_decref(self.id)
  305. del self.manifest.archives[self.name]
  306. self.manifest.write()
  307. self.store.commit()
  308. cache.commit()
  309. def stat_attrs(self, st, path):
  310. item = {
  311. 'mode': st.st_mode,
  312. 'uid': st.st_uid, 'user': uid2user(st.st_uid),
  313. 'gid': st.st_gid, 'group': gid2group(st.st_gid),
  314. 'mtime': st.st_mtime,
  315. }
  316. if self.numeric_owner:
  317. item['user'] = item['group'] = None
  318. try:
  319. xa = xattr(path, XATTR_NOFOLLOW)
  320. xattrs = {}
  321. for key in xa:
  322. # Only store the user namespace on Linux
  323. if linux and not key.startswith('user'):
  324. continue
  325. xattrs[key] = xa[key]
  326. if xattrs:
  327. item['xattrs'] = xattrs
  328. except IOError:
  329. pass
  330. return item
  331. def process_item(self, path, st):
  332. item = {'path': path.lstrip('/\\:')}
  333. item.update(self.stat_attrs(st, path))
  334. self.add_item(item)
  335. def process_dev(self, path, st):
  336. item = {'path': path.lstrip('/\\:'), 'rdev': st.st_rdev}
  337. item.update(self.stat_attrs(st, path))
  338. self.add_item(item)
  339. def process_symlink(self, path, st):
  340. source = os.readlink(path)
  341. item = {'path': path.lstrip('/\\:'), 'source': source}
  342. item.update(self.stat_attrs(st, path))
  343. self.add_item(item)
  344. def process_file(self, path, st, cache):
  345. safe_path = path.lstrip('/\\:')
  346. # Is it a hard link?
  347. if st.st_nlink > 1:
  348. source = self.hard_links.get((st.st_ino, st.st_dev))
  349. if (st.st_ino, st.st_dev) in self.hard_links:
  350. item = self.stat_attrs(st, path)
  351. item.update({'path': safe_path, 'source': source})
  352. self.add_item(item)
  353. return
  354. else:
  355. self.hard_links[st.st_ino, st.st_dev] = safe_path
  356. path_hash = self.key.id_hash(path)
  357. ids = cache.file_known_and_unchanged(path_hash, st)
  358. chunks = None
  359. if ids is not None:
  360. # Make sure all ids are available
  361. for id in ids:
  362. if not cache.seen_chunk(id):
  363. break
  364. else:
  365. chunks = [cache.chunk_incref(id, self.stats) for id in ids]
  366. # Only chunkify the file if needed
  367. if chunks is None:
  368. with open(path, 'rb') as fd:
  369. chunks = []
  370. for chunk in chunkify(fd, WINDOW_SIZE, CHUNK_MASK, CHUNK_MIN, self.key.chunk_seed):
  371. chunks.append(cache.add_chunk(self.key.id_hash(chunk), chunk, self.stats))
  372. ids = [id for id, _, _ in chunks]
  373. cache.memorize_file(path_hash, st, ids)
  374. item = {'path': safe_path, 'chunks': chunks}
  375. item.update(self.stat_attrs(st, path))
  376. self.stats.nfiles += 1
  377. self.add_item(item)
  378. @staticmethod
  379. def list_archives(store, key, manifest, cache=None):
  380. for name, info in manifest.archives.items():
  381. yield Archive(store, key, manifest, name, cache=cache)