archive.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417
  1. from datetime import datetime, timedelta
  2. from getpass import getuser
  3. from itertools import zip_longest
  4. import msgpack
  5. import os
  6. import socket
  7. import stat
  8. import sys
  9. import time
  10. from io import BytesIO
  11. import xattr
  12. from .chunker import chunkify
  13. from .helpers import uid2user, user2uid, gid2group, group2gid, \
  14. Statistics, decode_dict, st_mtime_ns
  15. ITEMS_BUFFER = 1024 * 1024
  16. CHUNK_MIN = 1024
  17. WINDOW_SIZE = 0xfff
  18. CHUNK_MASK = 0xffff
  19. utime_supports_fd = os.utime in getattr(os, 'supports_fd', {})
  20. has_mtime_ns = sys.version >= '3.3'
  21. has_lchmod = hasattr(os, 'lchmod')
  22. class ItemIter(object):
  23. def __init__(self, unpacker, filter):
  24. self.unpacker = iter(unpacker)
  25. self.filter = filter
  26. self.stack = []
  27. self.peeks = 0
  28. self._peek = None
  29. self._peek_iter = None
  30. global foo
  31. foo = self
  32. def __iter__(self):
  33. return self
  34. def __next__(self):
  35. if self.stack:
  36. item = self.stack.pop(0)
  37. else:
  38. self._peek = None
  39. item = self.get_next()
  40. self.peeks = max(0, self.peeks - len(item.get(b'chunks', [])))
  41. return item
  42. def get_next(self):
  43. while True:
  44. n = next(self.unpacker)
  45. decode_dict(n, (b'path', b'source', b'user', b'group'))
  46. if not self.filter or self.filter(n):
  47. return n
  48. def peek(self):
  49. while True:
  50. while not self._peek or not self._peek_iter:
  51. if self.peeks > 100:
  52. raise StopIteration
  53. self._peek = self.get_next()
  54. self.stack.append(self._peek)
  55. if b'chunks' in self._peek:
  56. self._peek_iter = iter(self._peek[b'chunks'])
  57. else:
  58. self._peek_iter = None
  59. try:
  60. item = next(self._peek_iter)
  61. self.peeks += 1
  62. return item
  63. except StopIteration:
  64. self._peek = None
  65. class Archive(object):
  66. class DoesNotExist(Exception):
  67. pass
  68. class AlreadyExists(Exception):
  69. pass
  70. def __init__(self, repository, key, manifest, name, cache=None, create=False,
  71. checkpoint_interval=300, numeric_owner=False):
  72. self.cwd = os.getcwd()
  73. self.key = key
  74. self.repository = repository
  75. self.cache = cache
  76. self.manifest = manifest
  77. self.items = BytesIO()
  78. self.items_ids = []
  79. self.hard_links = {}
  80. self.stats = Statistics()
  81. self.name = name
  82. self.checkpoint_interval = checkpoint_interval
  83. self.numeric_owner = numeric_owner
  84. if create:
  85. if name in manifest.archives:
  86. raise self.AlreadyExists(name)
  87. self.last_checkpoint = time.time()
  88. i = 0
  89. while True:
  90. self.checkpoint_name = '%s.checkpoint%s' % (name, i and ('.%d' % i) or '')
  91. if not self.checkpoint_name in manifest.archives:
  92. break
  93. i += 1
  94. else:
  95. if name not in self.manifest.archives:
  96. raise self.DoesNotExist(name)
  97. info = self.manifest.archives[name]
  98. self.load(info[b'id'])
  99. def load(self, id):
  100. self.id = id
  101. data = self.key.decrypt(self.id, self.repository.get(self.id))
  102. self.metadata = msgpack.unpackb(data)
  103. if self.metadata[b'version'] != 1:
  104. raise Exception('Unknown archive metadata version')
  105. decode_dict(self.metadata, (b'name', b'hostname', b'username', b'time'))
  106. self.metadata[b'cmdline'] = [arg.decode('utf-8', 'surrogateescape') for arg in self.metadata[b'cmdline']]
  107. self.name = self.metadata[b'name']
  108. @property
  109. def ts(self):
  110. """Timestamp of archive creation in UTC"""
  111. t, f = self.metadata[b'time'].split('.', 1)
  112. return datetime.strptime(t, '%Y-%m-%dT%H:%M:%S') + timedelta(seconds=float('.' + f))
  113. def __repr__(self):
  114. return 'Archive(%r)' % self.name
  115. def iter_items(self, filter=None):
  116. unpacker = msgpack.Unpacker(use_list=False)
  117. i = 0
  118. n = 20
  119. while True:
  120. items = self.metadata[b'items'][i:i + n]
  121. i += n
  122. if not items:
  123. break
  124. for id, chunk in [(id, chunk) for id, chunk in zip_longest(items, self.repository.get_many(items))]:
  125. unpacker.feed(self.key.decrypt(id, chunk))
  126. iter = ItemIter(unpacker, filter)
  127. for item in iter:
  128. yield item, iter.peek
  129. def add_item(self, item):
  130. self.items.write(msgpack.packb(item, unicode_errors='surrogateescape'))
  131. now = time.time()
  132. if now - self.last_checkpoint > self.checkpoint_interval:
  133. self.last_checkpoint = now
  134. self.write_checkpoint()
  135. if self.items.tell() > ITEMS_BUFFER:
  136. self.flush_items()
  137. def flush_items(self, flush=False):
  138. if self.items.tell() == 0:
  139. return
  140. self.items.seek(0)
  141. chunks = list(bytes(s) for s in chunkify(self.items, WINDOW_SIZE, CHUNK_MASK, CHUNK_MIN, self.key.chunk_seed))
  142. self.items.seek(0)
  143. self.items.truncate()
  144. for chunk in chunks[:-1]:
  145. id, _, _ = self.cache.add_chunk(self.key.id_hash(chunk), chunk, self.stats)
  146. self.items_ids.append(id)
  147. if flush or len(chunks) == 1:
  148. id, _, _ = self.cache.add_chunk(self.key.id_hash(chunks[-1]), chunks[-1], self.stats)
  149. self.items_ids.append(id)
  150. else:
  151. self.items.write(chunks[-1])
  152. def write_checkpoint(self):
  153. self.save(self.checkpoint_name)
  154. del self.manifest.archives[self.checkpoint_name]
  155. self.cache.chunk_decref(self.id)
  156. def save(self, name=None):
  157. name = name or self.name
  158. if name in self.manifest.archives:
  159. raise self.AlreadyExists(name)
  160. self.flush_items(flush=True)
  161. metadata = {
  162. 'version': 1,
  163. 'name': name,
  164. 'items': self.items_ids,
  165. 'cmdline': sys.argv,
  166. 'hostname': socket.gethostname(),
  167. 'username': getuser(),
  168. 'time': datetime.utcnow().isoformat(),
  169. }
  170. data = msgpack.packb(metadata, unicode_errors='surrogateescape')
  171. self.id = self.key.id_hash(data)
  172. self.cache.add_chunk(self.id, data, self.stats)
  173. self.manifest.archives[name] = {'id': self.id, 'time': metadata['time']}
  174. self.manifest.write()
  175. self.repository.commit()
  176. self.cache.commit()
  177. def calc_stats(self, cache):
  178. def add(id):
  179. count, size, csize = self.cache.chunks[id]
  180. stats.update(size, csize, count == 1)
  181. self.cache.chunks[id] = count - 1, size, csize
  182. # This function is a bit evil since it abuses the cache to calculate
  183. # the stats. The cache transaction must be rolled back afterwards
  184. unpacker = msgpack.Unpacker(use_list=False)
  185. cache.begin_txn()
  186. stats = Statistics()
  187. add(self.id)
  188. for id, chunk in zip_longest(self.metadata[b'items'], self.repository.get_many(self.metadata[b'items'])):
  189. add(id)
  190. unpacker.feed(self.key.decrypt(id, chunk))
  191. for item in unpacker:
  192. try:
  193. for id, size, csize in item[b'chunks']:
  194. add(id)
  195. stats.nfiles += 1
  196. except KeyError:
  197. pass
  198. cache.rollback()
  199. return stats
  200. def extract_item(self, item, dest=None, restore_attrs=True, peek=None):
  201. dest = dest or self.cwd
  202. assert item[b'path'][:1] not in ('/', '\\', ':')
  203. path = os.path.join(dest, item[b'path'])
  204. # Attempt to remove existing files, ignore errors on failure
  205. try:
  206. st = os.lstat(path)
  207. if stat.S_ISDIR(st.st_mode):
  208. os.rmdir(path)
  209. else:
  210. os.unlink(path)
  211. except OSError:
  212. pass
  213. mode = item[b'mode']
  214. if stat.S_ISDIR(mode):
  215. if not os.path.exists(path):
  216. os.makedirs(path)
  217. if restore_attrs:
  218. self.restore_attrs(path, item)
  219. elif stat.S_ISREG(mode):
  220. if not os.path.exists(os.path.dirname(path)):
  221. os.makedirs(os.path.dirname(path))
  222. # Hard link?
  223. if b'source' in item:
  224. source = os.path.join(dest, item[b'source'])
  225. if os.path.exists(path):
  226. os.unlink(path)
  227. os.link(source, path)
  228. else:
  229. with open(path, 'wb') as fd:
  230. ids = [id for id, size, csize in item[b'chunks']]
  231. for id, chunk in zip_longest(ids, self.repository.get_many(ids, peek)):
  232. data = self.key.decrypt(id, chunk)
  233. fd.write(data)
  234. self.restore_attrs(path, item, fd=fd.fileno())
  235. elif stat.S_ISFIFO(mode):
  236. if not os.path.exists(os.path.dirname(path)):
  237. os.makedirs(os.path.dirname(path))
  238. os.mkfifo(path)
  239. self.restore_attrs(path, item)
  240. elif stat.S_ISLNK(mode):
  241. if not os.path.exists(os.path.dirname(path)):
  242. os.makedirs(os.path.dirname(path))
  243. source = item[b'source']
  244. if os.path.exists(path):
  245. os.unlink(path)
  246. os.symlink(source, path)
  247. self.restore_attrs(path, item, symlink=True)
  248. elif stat.S_ISCHR(mode) or stat.S_ISBLK(mode):
  249. os.mknod(path, item[b'mode'], item[b'rdev'])
  250. self.restore_attrs(path, item)
  251. else:
  252. raise Exception('Unknown archive item type %r' % item[b'mode'])
  253. def restore_attrs(self, path, item, symlink=False, fd=None):
  254. xattrs = item.get(b'xattrs')
  255. if xattrs:
  256. for k, v in xattrs.items():
  257. try:
  258. xattr.set(fd or path, k, v)
  259. except (EnvironmentError):
  260. pass
  261. uid = gid = None
  262. if not self.numeric_owner:
  263. uid = user2uid(item[b'user'])
  264. gid = group2gid(item[b'group'])
  265. uid = uid or item[b'uid']
  266. gid = gid or item[b'gid']
  267. # This code is a bit of a mess due to os specific differences
  268. try:
  269. if fd:
  270. os.fchown(fd, uid, gid)
  271. else:
  272. os.lchown(path, uid, gid)
  273. except OSError:
  274. pass
  275. if fd:
  276. os.fchmod(fd, item[b'mode'])
  277. elif not symlink:
  278. os.chmod(path, item[b'mode'])
  279. elif has_lchmod: # Not available on Linux
  280. os.lchmod(path, item[b'mode'])
  281. if fd and utime_supports_fd: # Python >= 3.3
  282. os.utime(fd, None, ns=(item[b'mtime'], item[b'mtime']))
  283. elif utime_supports_fd: # Python >= 3.3
  284. os.utime(path, None, ns=(item[b'mtime'], item[b'mtime']), follow_symlinks=False)
  285. elif not symlink:
  286. os.utime(path, (item[b'mtime'] / 10**9, item[b'mtime'] / 10**9))
  287. def verify_file(self, item, start, result, peek=None):
  288. if not item[b'chunks']:
  289. start(item)
  290. result(item, True)
  291. else:
  292. start(item)
  293. ids = [id for id, size, csize in item[b'chunks']]
  294. try:
  295. for id, chunk in zip_longest(ids, self.repository.get_many(ids, peek)):
  296. self.key.decrypt(id, chunk)
  297. except Exception:
  298. result(item, False)
  299. return
  300. result(item, True)
  301. def delete(self, cache):
  302. unpacker = msgpack.Unpacker(use_list=False)
  303. for id in self.metadata[b'items']:
  304. unpacker.feed(self.key.decrypt(id, self.repository.get(id)))
  305. for item in unpacker:
  306. try:
  307. for chunk_id, size, csize in item[b'chunks']:
  308. self.cache.chunk_decref(chunk_id)
  309. except KeyError:
  310. pass
  311. self.cache.chunk_decref(id)
  312. self.cache.chunk_decref(self.id)
  313. del self.manifest.archives[self.name]
  314. self.manifest.write()
  315. self.repository.commit()
  316. cache.commit()
  317. def stat_attrs(self, st, path):
  318. item = {
  319. b'mode': st.st_mode,
  320. b'uid': st.st_uid, b'user': uid2user(st.st_uid),
  321. b'gid': st.st_gid, b'group': gid2group(st.st_gid),
  322. b'mtime': st_mtime_ns(st),
  323. }
  324. if self.numeric_owner:
  325. item[b'user'] = item[b'group'] = None
  326. try:
  327. xattrs = xattr.get_all(path, True)
  328. if xattrs:
  329. item[b'xattrs'] = dict(xattrs)
  330. except EnvironmentError:
  331. pass
  332. return item
  333. def process_item(self, path, st):
  334. item = {b'path': path.lstrip('/\\:')}
  335. item.update(self.stat_attrs(st, path))
  336. self.add_item(item)
  337. def process_dev(self, path, st):
  338. item = {b'path': path.lstrip('/\\:'), b'rdev': st.st_rdev}
  339. item.update(self.stat_attrs(st, path))
  340. self.add_item(item)
  341. def process_symlink(self, path, st):
  342. source = os.readlink(path)
  343. item = {b'path': path.lstrip('/\\:'), b'source': source}
  344. item.update(self.stat_attrs(st, path))
  345. self.add_item(item)
  346. def process_file(self, path, st, cache):
  347. safe_path = path.lstrip('/\\:')
  348. # Is it a hard link?
  349. if st.st_nlink > 1:
  350. source = self.hard_links.get((st.st_ino, st.st_dev))
  351. if (st.st_ino, st.st_dev) in self.hard_links:
  352. item = self.stat_attrs(st, path)
  353. item.update({b'path': safe_path, b'source': source})
  354. self.add_item(item)
  355. return
  356. else:
  357. self.hard_links[st.st_ino, st.st_dev] = safe_path
  358. path_hash = self.key.id_hash(os.path.join(self.cwd, path).encode('utf-8', 'surrogateescape'))
  359. ids = cache.file_known_and_unchanged(path_hash, st)
  360. chunks = None
  361. if ids is not None:
  362. # Make sure all ids are available
  363. for id in ids:
  364. if not cache.seen_chunk(id):
  365. break
  366. else:
  367. chunks = [cache.chunk_incref(id, self.stats) for id in ids]
  368. # Only chunkify the file if needed
  369. if chunks is None:
  370. with open(path, 'rb') as fd:
  371. chunks = []
  372. for chunk in chunkify(fd, WINDOW_SIZE, CHUNK_MASK, CHUNK_MIN, self.key.chunk_seed):
  373. chunks.append(cache.add_chunk(self.key.id_hash(chunk), chunk, self.stats))
  374. ids = [id for id, _, _ in chunks]
  375. cache.memorize_file(path_hash, st, ids)
  376. item = {b'path': safe_path, b'chunks': chunks}
  377. item.update(self.stat_attrs(st, path))
  378. self.stats.nfiles += 1
  379. self.add_item(item)
  380. @staticmethod
  381. def list_archives(repository, key, manifest, cache=None):
  382. for name, info in manifest.archives.items():
  383. yield Archive(repository, key, manifest, name, cache=cache)