archive.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416
  1. from datetime import datetime, timedelta
  2. from getpass import getuser
  3. from itertools import zip_longest
  4. import msgpack
  5. import os
  6. import socket
  7. import stat
  8. import sys
  9. import time
  10. from io import BytesIO
  11. import xattr
  12. from .chunker import chunkify
  13. from .helpers import uid2user, user2uid, gid2group, group2gid, \
  14. Statistics, decode_dict
  15. ITEMS_BUFFER = 1024 * 1024
  16. CHUNK_MIN = 1024
  17. WINDOW_SIZE = 0xfff
  18. CHUNK_MASK = 0xffff
  19. utime_supports_fd = os.utime in getattr(os, 'supports_fd', {})
  20. has_lchmod = hasattr(os, 'lchmod')
  21. class ItemIter(object):
  22. def __init__(self, unpacker, filter):
  23. self.unpacker = iter(unpacker)
  24. self.filter = filter
  25. self.stack = []
  26. self.peeks = 0
  27. self._peek = None
  28. self._peek_iter = None
  29. global foo
  30. foo = self
  31. def __iter__(self):
  32. return self
  33. def __next__(self):
  34. if self.stack:
  35. item = self.stack.pop(0)
  36. else:
  37. self._peek = None
  38. item = self.get_next()
  39. self.peeks = max(0, self.peeks - len(item.get(b'chunks', [])))
  40. return item
  41. def get_next(self):
  42. n = next(self.unpacker)
  43. while self.filter and not self.filter(n):
  44. n = next(self.unpacker)
  45. return n
  46. def peek(self):
  47. while True:
  48. while not self._peek or not self._peek_iter:
  49. if self.peeks > 100:
  50. raise StopIteration
  51. self._peek = self.get_next()
  52. self.stack.append(self._peek)
  53. if b'chunks' in self._peek:
  54. self._peek_iter = iter(self._peek[b'chunks'])
  55. else:
  56. self._peek_iter = None
  57. try:
  58. item = next(self._peek_iter)
  59. self.peeks += 1
  60. return item
  61. except StopIteration:
  62. self._peek = None
  63. class Archive(object):
  64. class DoesNotExist(Exception):
  65. pass
  66. class AlreadyExists(Exception):
  67. pass
  68. def __init__(self, store, key, manifest, name, cache=None, create=False,
  69. checkpoint_interval=300, numeric_owner=False):
  70. self.cwd = os.getcwd()
  71. self.key = key
  72. self.store = store
  73. self.cache = cache
  74. self.manifest = manifest
  75. self.items = BytesIO()
  76. self.items_ids = []
  77. self.hard_links = {}
  78. self.stats = Statistics()
  79. self.name = name
  80. self.checkpoint_interval = checkpoint_interval
  81. self.numeric_owner = numeric_owner
  82. if create:
  83. if name in manifest.archives:
  84. raise self.AlreadyExists(name)
  85. self.last_checkpoint = time.time()
  86. i = 0
  87. while True:
  88. self.checkpoint_name = '%s.checkpoint%s' % (name, i and ('.%d' % i) or '')
  89. if not self.checkpoint_name in manifest.archives:
  90. break
  91. i += 1
  92. else:
  93. if name not in self.manifest.archives:
  94. raise self.DoesNotExist(name)
  95. info = self.manifest.archives[name]
  96. self.load(info[b'id'])
  97. def load(self, id):
  98. self.id = id
  99. data = self.key.decrypt(self.id, self.store.get(self.id))
  100. self.metadata = msgpack.unpackb(data)
  101. if self.metadata[b'version'] != 1:
  102. raise Exception('Unknown archive metadata version')
  103. decode_dict(self.metadata, (b'name', b'hostname', b'username', b'time'))
  104. self.metadata[b'cmdline'] = [arg.decode('utf-8', 'surrogateescape') for arg in self.metadata[b'cmdline']]
  105. self.name = self.metadata[b'name']
  106. @property
  107. def ts(self):
  108. """Timestamp of archive creation in UTC"""
  109. t, f = self.metadata[b'time'].split('.', 1)
  110. return datetime.strptime(t, '%Y-%m-%dT%H:%M:%S') + timedelta(seconds=float('.' + f))
  111. def __repr__(self):
  112. return 'Archive(%r)' % self.name
  113. def iter_items(self, filter=None):
  114. unpacker = msgpack.Unpacker(use_list=False)
  115. i = 0
  116. n = 20
  117. while True:
  118. items = self.metadata[b'items'][i:i + n]
  119. i += n
  120. if not items:
  121. break
  122. for id, chunk in [(id, chunk) for id, chunk in zip_longest(items, self.store.get_many(items))]:
  123. unpacker.feed(self.key.decrypt(id, chunk))
  124. iter = ItemIter(unpacker, filter)
  125. for item in iter:
  126. decode_dict(item, (b'path', b'source', b'user', b'group'))
  127. yield item, iter.peek
  128. def add_item(self, item):
  129. self.items.write(msgpack.packb(item, unicode_errors='surrogateescape'))
  130. now = time.time()
  131. if now - self.last_checkpoint > self.checkpoint_interval:
  132. self.last_checkpoint = now
  133. self.write_checkpoint()
  134. if self.items.tell() > ITEMS_BUFFER:
  135. self.flush_items()
  136. def flush_items(self, flush=False):
  137. if self.items.tell() == 0:
  138. return
  139. self.items.seek(0)
  140. chunks = list(bytes(s) for s in chunkify(self.items, WINDOW_SIZE, CHUNK_MASK, CHUNK_MIN, self.key.chunk_seed))
  141. self.items.seek(0)
  142. self.items.truncate()
  143. for chunk in chunks[:-1]:
  144. id, _, _ = self.cache.add_chunk(self.key.id_hash(chunk), chunk, self.stats)
  145. self.items_ids.append(id)
  146. if flush or len(chunks) == 1:
  147. id, _, _ = self.cache.add_chunk(self.key.id_hash(chunks[-1]), chunks[-1], self.stats)
  148. self.items_ids.append(id)
  149. else:
  150. self.items.write(chunks[-1])
  151. def write_checkpoint(self):
  152. self.save(self.checkpoint_name)
  153. del self.manifest.archives[self.checkpoint_name]
  154. self.cache.chunk_decref(self.id)
  155. def save(self, name=None):
  156. name = name or self.name
  157. if name in self.manifest.archives:
  158. raise self.AlreadyExists(name)
  159. self.flush_items(flush=True)
  160. metadata = {
  161. 'version': 1,
  162. 'name': name,
  163. 'items': self.items_ids,
  164. 'cmdline': sys.argv,
  165. 'hostname': socket.gethostname(),
  166. 'username': getuser(),
  167. 'time': datetime.utcnow().isoformat(),
  168. }
  169. data = msgpack.packb(metadata, unicode_errors='surrogateescape')
  170. self.id = self.key.id_hash(data)
  171. self.cache.add_chunk(self.id, data, self.stats)
  172. self.manifest.archives[name] = {'id': self.id, 'time': metadata['time']}
  173. self.manifest.write()
  174. self.store.commit()
  175. self.cache.commit()
  176. def calc_stats(self, cache):
  177. def add(id):
  178. count, size, csize = self.cache.chunks[id]
  179. stats.update(size, csize, count == 1)
  180. self.cache.chunks[id] = count - 1, size, csize
  181. # This function is a bit evil since it abuses the cache to calculate
  182. # the stats. The cache transaction must be rolled back afterwards
  183. unpacker = msgpack.Unpacker(use_list=False)
  184. cache.begin_txn()
  185. stats = Statistics()
  186. add(self.id)
  187. for id, chunk in zip_longest(self.metadata[b'items'], self.store.get_many(self.metadata[b'items'])):
  188. add(id)
  189. unpacker.feed(self.key.decrypt(id, chunk))
  190. for item in unpacker:
  191. try:
  192. for id, size, csize in item[b'chunks']:
  193. add(id)
  194. stats.nfiles += 1
  195. except KeyError:
  196. pass
  197. cache.rollback()
  198. return stats
  199. def extract_item(self, item, dest=None, restore_attrs=True, peek=None):
  200. dest = dest or self.cwd
  201. assert item[b'path'][:1] not in ('/', '\\', ':')
  202. path = os.path.join(dest, item[b'path'])
  203. # Attempt to remove existing files, ignore errors on failure
  204. try:
  205. st = os.lstat(path)
  206. if stat.S_ISDIR(st.st_mode):
  207. os.rmdir(path)
  208. else:
  209. os.unlink(path)
  210. except OSError:
  211. pass
  212. mode = item[b'mode']
  213. if stat.S_ISDIR(mode):
  214. if not os.path.exists(path):
  215. os.makedirs(path)
  216. if restore_attrs:
  217. self.restore_attrs(path, item)
  218. elif stat.S_ISREG(mode):
  219. if not os.path.exists(os.path.dirname(path)):
  220. os.makedirs(os.path.dirname(path))
  221. # Hard link?
  222. if b'source' in item:
  223. source = os.path.join(dest, item[b'source'])
  224. if os.path.exists(path):
  225. os.unlink(path)
  226. os.link(source, path)
  227. else:
  228. with open(path, 'wb') as fd:
  229. ids = [id for id, size, csize in item[b'chunks']]
  230. for id, chunk in zip_longest(ids, self.store.get_many(ids, peek)):
  231. data = self.key.decrypt(id, chunk)
  232. fd.write(data)
  233. self.restore_attrs(path, item, fd=fd.fileno())
  234. elif stat.S_ISFIFO(mode):
  235. if not os.path.exists(os.path.dirname(path)):
  236. os.makedirs(os.path.dirname(path))
  237. os.mkfifo(path)
  238. self.restore_attrs(path, item)
  239. elif stat.S_ISLNK(mode):
  240. if not os.path.exists(os.path.dirname(path)):
  241. os.makedirs(os.path.dirname(path))
  242. source = item[b'source']
  243. if os.path.exists(path):
  244. os.unlink(path)
  245. os.symlink(source, path)
  246. self.restore_attrs(path, item, symlink=True)
  247. elif stat.S_ISCHR(mode) or stat.S_ISBLK(mode):
  248. os.mknod(path, item[b'mode'], item[b'rdev'])
  249. self.restore_attrs(path, item)
  250. else:
  251. raise Exception('Unknown archive item type %r' % item[b'mode'])
  252. def restore_attrs(self, path, item, symlink=False, fd=None):
  253. xattrs = item.get(b'xattrs')
  254. if xattrs:
  255. for k, v in xattrs.items():
  256. try:
  257. xattr.set(fd or path, k, v)
  258. except (EnvironmentError):
  259. pass
  260. uid = gid = None
  261. if not self.numeric_owner:
  262. uid = user2uid(item[b'user'])
  263. gid = group2gid(item[b'group'])
  264. uid = uid or item[b'uid']
  265. gid = gid or item[b'gid']
  266. # This code is a bit of a mess due to os specific differences
  267. try:
  268. if fd:
  269. os.fchown(fd, uid, gid)
  270. else:
  271. os.lchown(path, uid, gid)
  272. except OSError:
  273. pass
  274. if fd:
  275. os.fchmod(fd, item[b'mode'])
  276. elif not symlink:
  277. os.chmod(path, item[b'mode'])
  278. elif has_lchmod: # Not available on Linux
  279. os.lchmod(path, item[b'mode'])
  280. if fd and utime_supports_fd: # Python >= 3.3
  281. os.utime(fd, (item[b'mtime'], item[b'mtime']))
  282. elif utime_supports_fd: # Python >= 3.3
  283. os.utime(path, (item[b'mtime'], item[b'mtime']), follow_symlinks=False)
  284. elif not symlink:
  285. os.utime(path, (item[b'mtime'], item[b'mtime']))
  286. def verify_file(self, item, start, result, peek=None):
  287. if not item[b'chunks']:
  288. start(item)
  289. result(item, True)
  290. else:
  291. start(item)
  292. ids = [id for id, size, csize in item[b'chunks']]
  293. try:
  294. for id, chunk in zip_longest(ids, self.store.get_many(ids, peek)):
  295. self.key.decrypt(id, chunk)
  296. except Exception:
  297. result(item, False)
  298. return
  299. result(item, True)
  300. def delete(self, cache):
  301. unpacker = msgpack.Unpacker(use_list=False)
  302. for id in self.metadata[b'items']:
  303. unpacker.feed(self.key.decrypt(id, self.store.get(id)))
  304. for item in unpacker:
  305. try:
  306. for chunk_id, size, csize in item[b'chunks']:
  307. self.cache.chunk_decref(chunk_id)
  308. except KeyError:
  309. pass
  310. self.cache.chunk_decref(id)
  311. self.cache.chunk_decref(self.id)
  312. del self.manifest.archives[self.name]
  313. self.manifest.write()
  314. self.store.commit()
  315. cache.commit()
  316. def stat_attrs(self, st, path):
  317. item = {
  318. b'mode': st.st_mode,
  319. b'uid': st.st_uid, b'user': uid2user(st.st_uid),
  320. b'gid': st.st_gid, b'group': gid2group(st.st_gid),
  321. b'mtime': st.st_mtime,
  322. }
  323. if self.numeric_owner:
  324. item[b'user'] = item[b'group'] = None
  325. try:
  326. xattrs = xattr.get_all(path, True)
  327. if xattrs:
  328. item[b'xattrs'] = dict(xattrs)
  329. except EnvironmentError:
  330. pass
  331. return item
  332. def process_item(self, path, st):
  333. item = {b'path': path.lstrip('/\\:')}
  334. item.update(self.stat_attrs(st, path))
  335. self.add_item(item)
  336. def process_dev(self, path, st):
  337. item = {b'path': path.lstrip('/\\:'), b'rdev': st.st_rdev}
  338. item.update(self.stat_attrs(st, path))
  339. self.add_item(item)
  340. def process_symlink(self, path, st):
  341. source = os.readlink(path)
  342. item = {b'path': path.lstrip('/\\:'), b'source': source}
  343. item.update(self.stat_attrs(st, path))
  344. self.add_item(item)
  345. def process_file(self, path, st, cache):
  346. safe_path = path.lstrip('/\\:')
  347. # Is it a hard link?
  348. if st.st_nlink > 1:
  349. source = self.hard_links.get((st.st_ino, st.st_dev))
  350. if (st.st_ino, st.st_dev) in self.hard_links:
  351. item = self.stat_attrs(st, path)
  352. item.update({b'path': safe_path, b'source': source})
  353. self.add_item(item)
  354. return
  355. else:
  356. self.hard_links[st.st_ino, st.st_dev] = safe_path
  357. path_hash = self.key.id_hash(path.encode('utf-8', 'surrogateescape'))
  358. ids = cache.file_known_and_unchanged(path_hash, st)
  359. chunks = None
  360. if ids is not None:
  361. # Make sure all ids are available
  362. for id in ids:
  363. if not cache.seen_chunk(id):
  364. break
  365. else:
  366. chunks = [cache.chunk_incref(id, self.stats) for id in ids]
  367. # Only chunkify the file if needed
  368. if chunks is None:
  369. with open(path, 'rb') as fd:
  370. chunks = []
  371. for chunk in chunkify(fd, WINDOW_SIZE, CHUNK_MASK, CHUNK_MIN, self.key.chunk_seed):
  372. chunks.append(cache.add_chunk(self.key.id_hash(chunk), chunk, self.stats))
  373. ids = [id for id, _, _ in chunks]
  374. cache.memorize_file(path_hash, st, ids)
  375. item = {b'path': safe_path, b'chunks': chunks}
  376. item.update(self.stat_attrs(st, path))
  377. self.stats.nfiles += 1
  378. self.add_item(item)
  379. @staticmethod
  380. def list_archives(store, key, manifest, cache=None):
  381. for name, info in manifest.archives.items():
  382. yield Archive(store, key, manifest, name, cache=cache)