archive.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413
  1. from datetime import datetime, timedelta
  2. from getpass import getuser
  3. from itertools import zip_longest
  4. import msgpack
  5. import os
  6. import socket
  7. import stat
  8. import sys
  9. import time
  10. from io import BytesIO
  11. from . import xattr
  12. from .chunker import chunkify
  13. from .helpers import uid2user, user2uid, gid2group, group2gid, \
  14. Statistics, decode_dict, st_mtime_ns
  15. ITEMS_BUFFER = 1024 * 1024
  16. CHUNK_MIN = 1024
  17. WINDOW_SIZE = 0xfff
  18. CHUNK_MASK = 0xffff
  19. utime_supports_fd = os.utime in getattr(os, 'supports_fd', {})
  20. has_mtime_ns = sys.version >= '3.3'
  21. has_lchmod = hasattr(os, 'lchmod')
  22. class ItemIter(object):
  23. def __init__(self, unpacker, filter):
  24. self.unpacker = iter(unpacker)
  25. self.filter = filter
  26. self.stack = []
  27. self.peeks = 0
  28. self._peek = None
  29. self._peek_iter = None
  30. global foo
  31. foo = self
  32. def __iter__(self):
  33. return self
  34. def __next__(self):
  35. if self.stack:
  36. item = self.stack.pop(0)
  37. else:
  38. self._peek = None
  39. item = self.get_next()
  40. self.peeks = max(0, self.peeks - len(item.get(b'chunks', [])))
  41. return item
  42. def get_next(self):
  43. while True:
  44. n = next(self.unpacker)
  45. decode_dict(n, (b'path', b'source', b'user', b'group'))
  46. if not self.filter or self.filter(n):
  47. return n
  48. def peek(self):
  49. while True:
  50. while not self._peek or not self._peek_iter:
  51. if self.peeks > 100:
  52. raise StopIteration
  53. self._peek = self.get_next()
  54. self.stack.append(self._peek)
  55. if b'chunks' in self._peek:
  56. self._peek_iter = iter(self._peek[b'chunks'])
  57. else:
  58. self._peek_iter = None
  59. try:
  60. item = next(self._peek_iter)
  61. self.peeks += 1
  62. return item
  63. except StopIteration:
  64. self._peek = None
  65. class Archive(object):
  66. class DoesNotExist(Exception):
  67. pass
  68. class AlreadyExists(Exception):
  69. pass
  70. def __init__(self, repository, key, manifest, name, cache=None, create=False,
  71. checkpoint_interval=300, numeric_owner=False):
  72. self.cwd = os.getcwd()
  73. self.key = key
  74. self.repository = repository
  75. self.cache = cache
  76. self.manifest = manifest
  77. self.items = BytesIO()
  78. self.items_ids = []
  79. self.hard_links = {}
  80. self.stats = Statistics()
  81. self.name = name
  82. self.checkpoint_interval = checkpoint_interval
  83. self.numeric_owner = numeric_owner
  84. if create:
  85. if name in manifest.archives:
  86. raise self.AlreadyExists(name)
  87. self.last_checkpoint = time.time()
  88. i = 0
  89. while True:
  90. self.checkpoint_name = '%s.checkpoint%s' % (name, i and ('.%d' % i) or '')
  91. if not self.checkpoint_name in manifest.archives:
  92. break
  93. i += 1
  94. else:
  95. if name not in self.manifest.archives:
  96. raise self.DoesNotExist(name)
  97. info = self.manifest.archives[name]
  98. self.load(info[b'id'])
  99. def load(self, id):
  100. self.id = id
  101. data = self.key.decrypt(self.id, self.repository.get(self.id))
  102. self.metadata = msgpack.unpackb(data)
  103. if self.metadata[b'version'] != 1:
  104. raise Exception('Unknown archive metadata version')
  105. decode_dict(self.metadata, (b'name', b'hostname', b'username', b'time'))
  106. self.metadata[b'cmdline'] = [arg.decode('utf-8', 'surrogateescape') for arg in self.metadata[b'cmdline']]
  107. self.name = self.metadata[b'name']
  108. @property
  109. def ts(self):
  110. """Timestamp of archive creation in UTC"""
  111. t, f = self.metadata[b'time'].split('.', 1)
  112. return datetime.strptime(t, '%Y-%m-%dT%H:%M:%S') + timedelta(seconds=float('.' + f))
  113. def __repr__(self):
  114. return 'Archive(%r)' % self.name
  115. def iter_items(self, filter=None):
  116. unpacker = msgpack.Unpacker(use_list=False)
  117. i = 0
  118. n = 20
  119. while True:
  120. items = self.metadata[b'items'][i:i + n]
  121. i += n
  122. if not items:
  123. break
  124. for id, chunk in [(id, chunk) for id, chunk in zip_longest(items, self.repository.get_many(items))]:
  125. unpacker.feed(self.key.decrypt(id, chunk))
  126. iter = ItemIter(unpacker, filter)
  127. for item in iter:
  128. yield item, iter.peek
  129. def add_item(self, item):
  130. self.items.write(msgpack.packb(item, unicode_errors='surrogateescape'))
  131. now = time.time()
  132. if now - self.last_checkpoint > self.checkpoint_interval:
  133. self.last_checkpoint = now
  134. self.write_checkpoint()
  135. if self.items.tell() > ITEMS_BUFFER:
  136. self.flush_items()
  137. def flush_items(self, flush=False):
  138. if self.items.tell() == 0:
  139. return
  140. self.items.seek(0)
  141. chunks = list(bytes(s) for s in chunkify(self.items, WINDOW_SIZE, CHUNK_MASK, CHUNK_MIN, self.key.chunk_seed))
  142. self.items.seek(0)
  143. self.items.truncate()
  144. for chunk in chunks[:-1]:
  145. id, _, _ = self.cache.add_chunk(self.key.id_hash(chunk), chunk, self.stats)
  146. self.items_ids.append(id)
  147. if flush or len(chunks) == 1:
  148. id, _, _ = self.cache.add_chunk(self.key.id_hash(chunks[-1]), chunks[-1], self.stats)
  149. self.items_ids.append(id)
  150. else:
  151. self.items.write(chunks[-1])
  152. def write_checkpoint(self):
  153. self.save(self.checkpoint_name)
  154. del self.manifest.archives[self.checkpoint_name]
  155. self.cache.chunk_decref(self.id)
  156. def save(self, name=None):
  157. name = name or self.name
  158. if name in self.manifest.archives:
  159. raise self.AlreadyExists(name)
  160. self.flush_items(flush=True)
  161. metadata = {
  162. 'version': 1,
  163. 'name': name,
  164. 'items': self.items_ids,
  165. 'cmdline': sys.argv,
  166. 'hostname': socket.gethostname(),
  167. 'username': getuser(),
  168. 'time': datetime.utcnow().isoformat(),
  169. }
  170. data = msgpack.packb(metadata, unicode_errors='surrogateescape')
  171. self.id = self.key.id_hash(data)
  172. self.cache.add_chunk(self.id, data, self.stats)
  173. self.manifest.archives[name] = {'id': self.id, 'time': metadata['time']}
  174. self.manifest.write()
  175. self.repository.commit()
  176. self.cache.commit()
  177. def calc_stats(self, cache):
  178. def add(id):
  179. count, size, csize = self.cache.chunks[id]
  180. stats.update(size, csize, count == 1)
  181. self.cache.chunks[id] = count - 1, size, csize
  182. # This function is a bit evil since it abuses the cache to calculate
  183. # the stats. The cache transaction must be rolled back afterwards
  184. unpacker = msgpack.Unpacker(use_list=False)
  185. cache.begin_txn()
  186. stats = Statistics()
  187. add(self.id)
  188. for id, chunk in zip_longest(self.metadata[b'items'], self.repository.get_many(self.metadata[b'items'])):
  189. add(id)
  190. unpacker.feed(self.key.decrypt(id, chunk))
  191. for item in unpacker:
  192. try:
  193. for id, size, csize in item[b'chunks']:
  194. add(id)
  195. stats.nfiles += 1
  196. except KeyError:
  197. pass
  198. cache.rollback()
  199. return stats
  200. def extract_item(self, item, restore_attrs=True, peek=None):
  201. dest = self.cwd
  202. assert item[b'path'][:1] not in ('/', '\\', ':')
  203. path = os.path.join(dest, item[b'path'])
  204. # Attempt to remove existing files, ignore errors on failure
  205. try:
  206. st = os.lstat(path)
  207. if stat.S_ISDIR(st.st_mode):
  208. os.rmdir(path)
  209. else:
  210. os.unlink(path)
  211. except OSError:
  212. pass
  213. mode = item[b'mode']
  214. if stat.S_ISDIR(mode):
  215. if not os.path.exists(path):
  216. os.makedirs(path)
  217. if restore_attrs:
  218. self.restore_attrs(path, item)
  219. elif stat.S_ISREG(mode):
  220. if not os.path.exists(os.path.dirname(path)):
  221. os.makedirs(os.path.dirname(path))
  222. # Hard link?
  223. if b'source' in item:
  224. source = os.path.join(dest, item[b'source'])
  225. if os.path.exists(path):
  226. os.unlink(path)
  227. os.link(source, path)
  228. else:
  229. with open(path, 'wb') as fd:
  230. ids = [id for id, size, csize in item[b'chunks']]
  231. for id, chunk in zip_longest(ids, self.repository.get_many(ids, peek)):
  232. data = self.key.decrypt(id, chunk)
  233. fd.write(data)
  234. self.restore_attrs(path, item, fd=fd.fileno())
  235. elif stat.S_ISFIFO(mode):
  236. if not os.path.exists(os.path.dirname(path)):
  237. os.makedirs(os.path.dirname(path))
  238. os.mkfifo(path)
  239. self.restore_attrs(path, item)
  240. elif stat.S_ISLNK(mode):
  241. if not os.path.exists(os.path.dirname(path)):
  242. os.makedirs(os.path.dirname(path))
  243. source = item[b'source']
  244. if os.path.exists(path):
  245. os.unlink(path)
  246. os.symlink(source, path)
  247. self.restore_attrs(path, item, symlink=True)
  248. elif stat.S_ISCHR(mode) or stat.S_ISBLK(mode):
  249. os.mknod(path, item[b'mode'], item[b'rdev'])
  250. self.restore_attrs(path, item)
  251. else:
  252. raise Exception('Unknown archive item type %r' % item[b'mode'])
  253. def restore_attrs(self, path, item, symlink=False, fd=None):
  254. xattrs = item.get(b'xattrs')
  255. if xattrs:
  256. for k, v in xattrs.items():
  257. xattr.set(fd or path, k, v)
  258. uid = gid = None
  259. if not self.numeric_owner:
  260. uid = user2uid(item[b'user'])
  261. gid = group2gid(item[b'group'])
  262. uid = uid or item[b'uid']
  263. gid = gid or item[b'gid']
  264. # This code is a bit of a mess due to os specific differences
  265. try:
  266. if fd:
  267. os.fchown(fd, uid, gid)
  268. else:
  269. os.lchown(path, uid, gid)
  270. except OSError:
  271. pass
  272. if fd:
  273. os.fchmod(fd, item[b'mode'])
  274. elif not symlink:
  275. os.chmod(path, item[b'mode'])
  276. elif has_lchmod: # Not available on Linux
  277. os.lchmod(path, item[b'mode'])
  278. if fd and utime_supports_fd: # Python >= 3.3
  279. os.utime(fd, None, ns=(item[b'mtime'], item[b'mtime']))
  280. elif utime_supports_fd: # Python >= 3.3
  281. os.utime(path, None, ns=(item[b'mtime'], item[b'mtime']), follow_symlinks=False)
  282. elif not symlink:
  283. os.utime(path, (item[b'mtime'] / 10**9, item[b'mtime'] / 10**9))
  284. def verify_file(self, item, start, result, peek=None):
  285. if not item[b'chunks']:
  286. start(item)
  287. result(item, True)
  288. else:
  289. start(item)
  290. ids = [id for id, size, csize in item[b'chunks']]
  291. try:
  292. for id, chunk in zip_longest(ids, self.repository.get_many(ids, peek)):
  293. self.key.decrypt(id, chunk)
  294. except Exception:
  295. result(item, False)
  296. return
  297. result(item, True)
  298. def delete(self, cache):
  299. unpacker = msgpack.Unpacker(use_list=False)
  300. for id in self.metadata[b'items']:
  301. unpacker.feed(self.key.decrypt(id, self.repository.get(id)))
  302. for item in unpacker:
  303. try:
  304. for chunk_id, size, csize in item[b'chunks']:
  305. self.cache.chunk_decref(chunk_id)
  306. except KeyError:
  307. pass
  308. self.cache.chunk_decref(id)
  309. self.cache.chunk_decref(self.id)
  310. del self.manifest.archives[self.name]
  311. self.manifest.write()
  312. self.repository.commit()
  313. cache.commit()
  314. def stat_attrs(self, st, path):
  315. item = {
  316. b'mode': st.st_mode,
  317. b'uid': st.st_uid, b'user': uid2user(st.st_uid),
  318. b'gid': st.st_gid, b'group': gid2group(st.st_gid),
  319. b'mtime': st_mtime_ns(st),
  320. }
  321. if self.numeric_owner:
  322. item[b'user'] = item[b'group'] = None
  323. try:
  324. xattrs = xattr.get_all(path)
  325. if xattrs:
  326. item[b'xattrs'] = xattrs
  327. except PermissionError:
  328. pass
  329. return item
  330. def process_item(self, path, st):
  331. item = {b'path': path.lstrip('/\\:')}
  332. item.update(self.stat_attrs(st, path))
  333. self.add_item(item)
  334. def process_dev(self, path, st):
  335. item = {b'path': path.lstrip('/\\:'), b'rdev': st.st_rdev}
  336. item.update(self.stat_attrs(st, path))
  337. self.add_item(item)
  338. def process_symlink(self, path, st):
  339. source = os.readlink(path)
  340. item = {b'path': path.lstrip('/\\:'), b'source': source}
  341. item.update(self.stat_attrs(st, path))
  342. self.add_item(item)
  343. def process_file(self, path, st, cache):
  344. safe_path = path.lstrip('/\\:')
  345. # Is it a hard link?
  346. if st.st_nlink > 1:
  347. source = self.hard_links.get((st.st_ino, st.st_dev))
  348. if (st.st_ino, st.st_dev) in self.hard_links:
  349. item = self.stat_attrs(st, path)
  350. item.update({b'path': safe_path, b'source': source})
  351. self.add_item(item)
  352. return
  353. else:
  354. self.hard_links[st.st_ino, st.st_dev] = safe_path
  355. path_hash = self.key.id_hash(os.path.join(self.cwd, path).encode('utf-8', 'surrogateescape'))
  356. ids = cache.file_known_and_unchanged(path_hash, st)
  357. chunks = None
  358. if ids is not None:
  359. # Make sure all ids are available
  360. for id in ids:
  361. if not cache.seen_chunk(id):
  362. break
  363. else:
  364. chunks = [cache.chunk_incref(id, self.stats) for id in ids]
  365. # Only chunkify the file if needed
  366. if chunks is None:
  367. with open(path, 'rb') as fd:
  368. chunks = []
  369. for chunk in chunkify(fd, WINDOW_SIZE, CHUNK_MASK, CHUNK_MIN, self.key.chunk_seed):
  370. chunks.append(cache.add_chunk(self.key.id_hash(chunk), chunk, self.stats))
  371. ids = [id for id, _, _ in chunks]
  372. cache.memorize_file(path_hash, st, ids)
  373. item = {b'path': safe_path, b'chunks': chunks}
  374. item.update(self.stat_attrs(st, path))
  375. self.stats.nfiles += 1
  376. self.add_item(item)
  377. @staticmethod
  378. def list_archives(repository, key, manifest, cache=None):
  379. for name, info in manifest.archives.items():
  380. yield Archive(repository, key, manifest, name, cache=cache)