archive.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357
  1. from __future__ import with_statement
  2. from datetime import datetime, timedelta
  3. from getpass import getuser
  4. from itertools import izip
  5. import msgpack
  6. import os
  7. import socket
  8. import stat
  9. import sys
  10. import time
  11. from cStringIO import StringIO
  12. from xattr import xattr, XATTR_NOFOLLOW
  13. from ._speedups import chunkify
  14. from .helpers import uid2user, user2uid, gid2group, group2gid, \
  15. encode_filename, Statistics
  16. ITEMS_BUFFER = 1024 * 1024
  17. CHUNK_SIZE = 64 * 1024
  18. WINDOW_SIZE = 4096
  19. have_lchmod = hasattr(os, 'lchmod')
  20. linux = sys.platform == 'linux2'
  21. class Archive(object):
  22. class DoesNotExist(Exception):
  23. pass
  24. class AlreadyExists(Exception):
  25. pass
  26. def __init__(self, store, key, manifest, name, cache=None, create=False,
  27. checkpoint_interval=300, numeric_owner=False):
  28. if sys.platform == 'darwin':
  29. self.cwd = os.getcwdu()
  30. else:
  31. self.cwd = os.getcwd()
  32. self.key = key
  33. self.store = store
  34. self.cache = cache
  35. self.manifest = manifest
  36. self.items = StringIO()
  37. self.items_ids = []
  38. self.hard_links = {}
  39. self.stats = Statistics()
  40. self.name = name
  41. self.checkpoint_interval = checkpoint_interval
  42. self.numeric_owner = numeric_owner
  43. if create:
  44. if name in manifest.archives:
  45. raise self.AlreadyExists
  46. self.last_checkpoint = time.time()
  47. i = 0
  48. while True:
  49. self.checkpoint_name = '%s.checkpoint%s' % (name, i and ('.%d' % i) or '')
  50. if not self.checkpoint_name in manifest.archives:
  51. break
  52. i += 1
  53. else:
  54. try:
  55. info = self.manifest.archives[name]
  56. except KeyError:
  57. raise self.DoesNotExist
  58. self.load(info['id'])
  59. def load(self, id):
  60. self.id = id
  61. data = self.key.decrypt(self.id, self.store.get(self.id))
  62. self.metadata = msgpack.unpackb(data)
  63. if self.metadata['version'] != 1:
  64. raise Exception('Unknown archive metadata version')
  65. self.name = self.metadata['name']
  66. @property
  67. def ts(self):
  68. """Timestamp of archive creation in UTC"""
  69. t, f = self.metadata['time'].split('.', 1)
  70. return datetime.strptime(t, '%Y-%m-%dT%H:%M:%S') + timedelta(seconds=float('.' + f))
  71. def __repr__(self):
  72. return 'Archive(%r)' % self.name
  73. def iter_items(self):
  74. unpacker = msgpack.Unpacker()
  75. for id in self.metadata['items']:
  76. unpacker.feed(self.key.decrypt(id, self.store.get(id)))
  77. for item in unpacker:
  78. yield item
  79. def add_item(self, item):
  80. self.items.write(msgpack.packb(item))
  81. now = time.time()
  82. if now - self.last_checkpoint > self.checkpoint_interval:
  83. self.last_checkpoint = now
  84. self.write_checkpoint()
  85. if self.items.tell() > ITEMS_BUFFER:
  86. self.flush_items()
  87. def flush_items(self, flush=False):
  88. if self.items.tell() == 0:
  89. return
  90. self.items.seek(0)
  91. chunks = list(str(s) for s in chunkify(self.items, CHUNK_SIZE, WINDOW_SIZE, self.key.chunk_seed))
  92. self.items.seek(0)
  93. self.items.truncate()
  94. for chunk in chunks[:-1]:
  95. id, _, _ = self.cache.add_chunk(self.key.id_hash(chunk), chunk, self.stats)
  96. self.items_ids.append(id)
  97. if flush or len(chunks) == 1:
  98. id, _, _ = self.cache.add_chunk(self.key.id_hash(chunks[-1]), chunks[-1], self.stats)
  99. self.items_ids.append(id)
  100. else:
  101. self.items.write(chunks[-1])
  102. def write_checkpoint(self):
  103. self.save(self.checkpoint_name)
  104. del self.manifest.archives[self.checkpoint_name]
  105. self.cache.chunk_decref(self.id)
  106. def save(self, name=None):
  107. name = name or self.name
  108. if name in self.manifest.archives:
  109. raise self.AlreadyExists(name)
  110. self.flush_items(flush=True)
  111. metadata = {
  112. 'version': 1,
  113. 'name': name,
  114. 'items': self.items_ids,
  115. 'cmdline': sys.argv,
  116. 'hostname': socket.gethostname(),
  117. 'username': getuser(),
  118. 'time': datetime.utcnow().isoformat(),
  119. }
  120. data = msgpack.packb(metadata)
  121. self.id = self.key.id_hash(data)
  122. self.cache.add_chunk(self.id, data, self.stats)
  123. self.manifest.archives[name] = {'id': self.id, 'time': metadata['time']}
  124. self.manifest.write()
  125. self.store.commit()
  126. self.cache.commit()
  127. def calc_stats(self, cache):
  128. # This function is a bit evil since it abuses the cache to calculate
  129. # the stats. The cache transaction must be rolled back afterwards
  130. unpacker = msgpack.Unpacker()
  131. cache.begin_txn()
  132. stats = Statistics()
  133. for id in self.metadata['items']:
  134. unpacker.feed(self.key.decrypt(id, self.store.get(id)))
  135. for item in unpacker:
  136. try:
  137. for id, size, csize in item['chunks']:
  138. count, _, _ = self.cache.chunks[id]
  139. stats.update(size, csize, count == 1)
  140. stats.nfiles += 1
  141. self.cache.chunks[id] = count - 1, size, csize
  142. except KeyError:
  143. pass
  144. count, size, csize = self.cache.chunks[id]
  145. stats.update(size, csize, count == 1)
  146. self.cache.chunks[id] = count - 1, size, csize
  147. cache.rollback()
  148. return stats
  149. def extract_item(self, item, dest=None, start_cb=None, restore_attrs=True):
  150. dest = dest or self.cwd
  151. assert item['path'][0] not in ('/', '\\', ':')
  152. path = os.path.join(dest, encode_filename(item['path']))
  153. mode = item['mode']
  154. if stat.S_ISDIR(mode):
  155. if not os.path.exists(path):
  156. os.makedirs(path)
  157. if restore_attrs:
  158. self.restore_attrs(path, item)
  159. elif stat.S_ISREG(mode):
  160. if not os.path.exists(os.path.dirname(path)):
  161. os.makedirs(os.path.dirname(path))
  162. # Hard link?
  163. if 'source' in item:
  164. source = os.path.join(dest, item['source'])
  165. if os.path.exists(path):
  166. os.unlink(path)
  167. os.link(source, path)
  168. else:
  169. n = len(item['chunks'])
  170. ## 0 chunks indicates an empty (0 bytes) file
  171. if n == 0:
  172. open(path, 'wb').close()
  173. start_cb(item)
  174. else:
  175. fd = open(path, 'wb')
  176. start_cb(item)
  177. ids = [id for id, size, csize in item['chunks']]
  178. for id, chunk in izip(ids, self.store.get_many(ids)):
  179. data = self.key.decrypt(id, chunk)
  180. fd.write(data)
  181. fd.close()
  182. self.restore_attrs(path, item)
  183. elif stat.S_ISFIFO(mode):
  184. if not os.path.exists(os.path.dirname(path)):
  185. os.makedirs(os.path.dirname(path))
  186. os.mkfifo(path)
  187. self.restore_attrs(path, item)
  188. elif stat.S_ISLNK(mode):
  189. if not os.path.exists(os.path.dirname(path)):
  190. os.makedirs(os.path.dirname(path))
  191. source = item['source']
  192. if os.path.exists(path):
  193. os.unlink(path)
  194. os.symlink(source, path)
  195. self.restore_attrs(path, item, symlink=True)
  196. elif stat.S_ISCHR(mode) or stat.S_ISBLK(mode):
  197. os.mknod(path, item['mode'], item['dev'])
  198. self.restore_attrs(path, item)
  199. else:
  200. raise Exception('Unknown archive item type %r' % item['mode'])
  201. def restore_attrs(self, path, item, symlink=False):
  202. xattrs = item.get('xattrs')
  203. if xattrs:
  204. xa = xattr(path, XATTR_NOFOLLOW)
  205. for k, v in xattrs.items():
  206. try:
  207. xa.set(k, v)
  208. except (IOError, KeyError):
  209. pass
  210. if have_lchmod:
  211. os.lchmod(path, item['mode'])
  212. elif not symlink:
  213. os.chmod(path, item['mode'])
  214. uid = gid = None
  215. if not self.numeric_owner:
  216. uid = user2uid(item['user'])
  217. gid = group2gid(item['group'])
  218. uid = uid or item['uid']
  219. gid = gid or item['gid']
  220. try:
  221. os.lchown(path, uid, gid)
  222. except OSError:
  223. pass
  224. if not symlink:
  225. # FIXME: We should really call futimes here (c extension required)
  226. os.utime(path, (item['mtime'], item['mtime']))
  227. def verify_file(self, item, start, result):
  228. if not item['chunks']:
  229. start(item)
  230. result(item, True)
  231. else:
  232. start(item)
  233. ids = [id for id, size, csize in item['chunks']]
  234. try:
  235. for id, chunk in izip(ids, self.store.get_many(ids)):
  236. self.key.decrypt(id, chunk)
  237. except Exception:
  238. result(item, False)
  239. return
  240. result(item, True)
  241. def delete(self, cache):
  242. unpacker = msgpack.Unpacker()
  243. for id in self.metadata['items']:
  244. unpacker.feed(self.key.decrypt(id, self.store.get(id)))
  245. for item in unpacker:
  246. try:
  247. for chunk_id, size, csize in item['chunks']:
  248. self.cache.chunk_decref(chunk_id)
  249. except KeyError:
  250. pass
  251. self.cache.chunk_decref(id)
  252. self.cache.chunk_decref(self.id)
  253. del self.manifest.archives[self.name]
  254. self.manifest.write()
  255. self.store.commit()
  256. cache.commit()
  257. def stat_attrs(self, st, path):
  258. item = {
  259. 'mode': st.st_mode,
  260. 'uid': st.st_uid, 'user': uid2user(st.st_uid),
  261. 'gid': st.st_gid, 'group': gid2group(st.st_gid),
  262. 'mtime': st.st_mtime,
  263. }
  264. if self.numeric_owner:
  265. item['user'] = item['group'] = None
  266. try:
  267. xa = xattr(path, XATTR_NOFOLLOW)
  268. xattrs = {}
  269. for key in xa:
  270. # Only store the user namespace on Linux
  271. if linux and not key.startswith('user'):
  272. continue
  273. xattrs[key] = xa[key]
  274. if xattrs:
  275. item['xattrs'] = xattrs
  276. except IOError:
  277. pass
  278. return item
  279. def process_item(self, path, st):
  280. item = {'path': path.lstrip('/\\:')}
  281. item.update(self.stat_attrs(st, path))
  282. self.add_item(item)
  283. def process_dev(self, path, st):
  284. item = {'path': path.lstrip('/\\:'), 'dev': st.st_dev}
  285. item.update(self.stat_attrs(st, path))
  286. self.add_item(item)
  287. def process_symlink(self, path, st):
  288. source = os.readlink(path)
  289. item = {'path': path.lstrip('/\\:'), 'source': source}
  290. item.update(self.stat_attrs(st, path))
  291. self.add_item(item)
  292. def process_file(self, path, st, cache):
  293. safe_path = path.lstrip('/\\:')
  294. # Is it a hard link?
  295. if st.st_nlink > 1:
  296. source = self.hard_links.get((st.st_ino, st.st_dev))
  297. if (st.st_ino, st.st_dev) in self.hard_links:
  298. item = self.stat_attrs(st, path)
  299. item.update({'path': safe_path, 'source': source})
  300. self.add_item(item)
  301. return
  302. else:
  303. self.hard_links[st.st_ino, st.st_dev] = safe_path
  304. path_hash = self.key.id_hash(path)
  305. ids = cache.file_known_and_unchanged(path_hash, st)
  306. chunks = None
  307. if ids is not None:
  308. # Make sure all ids are available
  309. for id in ids:
  310. if not cache.seen_chunk(id):
  311. break
  312. else:
  313. chunks = [cache.chunk_incref(id, self.stats) for id in ids]
  314. # Only chunkify the file if needed
  315. if chunks is None:
  316. with open(path, 'rb') as fd:
  317. chunks = []
  318. for chunk in chunkify(fd, CHUNK_SIZE, WINDOW_SIZE,
  319. self.key.chunk_seed):
  320. chunks.append(cache.add_chunk(self.key.id_hash(chunk), chunk, self.stats))
  321. ids = [id for id, _, _ in chunks]
  322. cache.memorize_file(path_hash, st, ids)
  323. item = {'path': safe_path, 'chunks': chunks}
  324. item.update(self.stat_attrs(st, path))
  325. self.stats.nfiles += 1
  326. self.add_item(item)
  327. @staticmethod
  328. def list_archives(store, key, manifest, cache=None):
  329. for name, info in manifest.archives.items():
  330. yield Archive(store, key, manifest, name, cache=cache)