archive.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390
  1. from datetime import datetime, timedelta, timezone
  2. from getpass import getuser
  3. from itertools import zip_longest
  4. import msgpack
  5. import os
  6. import socket
  7. import stat
  8. import sys
  9. import time
  10. from io import BytesIO
  11. from attic import xattr
  12. from attic.chunker import chunkify
  13. from attic.helpers import Error, uid2user, user2uid, gid2group, group2gid, \
  14. Statistics, decode_dict, st_mtime_ns, make_path_safe
  15. ITEMS_BUFFER = 1024 * 1024
  16. CHUNK_MIN = 1024
  17. WINDOW_SIZE = 0xfff
  18. CHUNK_MASK = 0xffff
  19. utime_supports_fd = os.utime in getattr(os, 'supports_fd', {})
  20. has_mtime_ns = sys.version >= '3.3'
  21. has_lchmod = hasattr(os, 'lchmod')
  22. class DownloadPipeline:
  23. def __init__(self, repository, key):
  24. self.repository = repository
  25. self.key = key
  26. def unpack_many(self, ids, filter=None, preload=False):
  27. unpacker = msgpack.Unpacker(use_list=False)
  28. for data in self.fetch_many(ids):
  29. unpacker.feed(data)
  30. items = [decode_dict(item, (b'path', b'source', b'user', b'group')) for item in unpacker]
  31. if filter:
  32. items = [item for item in items if filter(item)]
  33. if preload:
  34. for item in items:
  35. if b'chunks' in item:
  36. self.repository.preload([c[0] for c in item[b'chunks']])
  37. for item in items:
  38. yield item
  39. def fetch_many(self, ids, is_preloaded=False):
  40. for id_, data in zip_longest(ids, self.repository.get_many(ids, is_preloaded=is_preloaded)):
  41. yield self.key.decrypt(id_, data)
  42. class ChunkBuffer:
  43. BUFFER_SIZE = 1 * 1024 * 1024
  44. def __init__(self, cache, key, stats):
  45. self.buffer = BytesIO()
  46. self.packer = msgpack.Packer(unicode_errors='surrogateescape')
  47. self.cache = cache
  48. self.chunks = []
  49. self.key = key
  50. self.stats = stats
  51. def add(self, item):
  52. self.buffer.write(self.packer.pack(item))
  53. def flush(self, flush=False):
  54. if self.buffer.tell() == 0:
  55. return
  56. self.buffer.seek(0)
  57. chunks = list(bytes(s) for s in chunkify(self.buffer, WINDOW_SIZE, CHUNK_MASK, CHUNK_MIN, self.key.chunk_seed))
  58. self.buffer.seek(0)
  59. self.buffer.truncate(0)
  60. # Leave the last parital chunk in the buffer unless flush is True
  61. end = None if flush or len(chunks) == 1 else -1
  62. for chunk in chunks[:end]:
  63. id_, _, _ = self.cache.add_chunk(self.key.id_hash(chunk), chunk, self.stats)
  64. self.chunks.append(id_)
  65. if end == -1:
  66. self.buffer.write(chunks[-1])
  67. def is_full(self):
  68. return self.buffer.tell() > self.BUFFER_SIZE
  69. class Archive:
  70. class DoesNotExist(Error):
  71. """Archive {} does not exist"""
  72. class AlreadyExists(Error):
  73. """Archive {} already exists"""
  74. def __init__(self, repository, key, manifest, name, cache=None, create=False,
  75. checkpoint_interval=300, numeric_owner=False):
  76. self.cwd = os.getcwd()
  77. self.key = key
  78. self.repository = repository
  79. self.cache = cache
  80. self.manifest = manifest
  81. self.hard_links = {}
  82. self.stats = Statistics()
  83. self.name = name
  84. self.checkpoint_interval = checkpoint_interval
  85. self.numeric_owner = numeric_owner
  86. self.items_buffer = ChunkBuffer(self.cache, self.key, self.stats)
  87. self.pipeline = DownloadPipeline(self.repository, self.key)
  88. if create:
  89. if name in manifest.archives:
  90. raise self.AlreadyExists(name)
  91. self.last_checkpoint = time.time()
  92. i = 0
  93. while True:
  94. self.checkpoint_name = '%s.checkpoint%s' % (name, i and ('.%d' % i) or '')
  95. if not self.checkpoint_name in manifest.archives:
  96. break
  97. i += 1
  98. else:
  99. if name not in self.manifest.archives:
  100. raise self.DoesNotExist(name)
  101. info = self.manifest.archives[name]
  102. self.load(info[b'id'])
  103. def load(self, id):
  104. self.id = id
  105. data = self.key.decrypt(self.id, self.repository.get(self.id))
  106. self.metadata = msgpack.unpackb(data)
  107. if self.metadata[b'version'] != 1:
  108. raise Exception('Unknown archive metadata version')
  109. decode_dict(self.metadata, (b'name', b'hostname', b'username', b'time'))
  110. self.metadata[b'cmdline'] = [arg.decode('utf-8', 'surrogateescape') for arg in self.metadata[b'cmdline']]
  111. self.name = self.metadata[b'name']
  112. @property
  113. def ts(self):
  114. """Timestamp of archive creation in UTC"""
  115. t, f = self.metadata[b'time'].split('.', 1)
  116. return datetime.strptime(t, '%Y-%m-%dT%H:%M:%S').replace(tzinfo=timezone.utc) + timedelta(seconds=float('.' + f))
  117. def __repr__(self):
  118. return 'Archive(%r)' % self.name
  119. def iter_items(self, filter=None, preload=False):
  120. for item in self.pipeline.unpack_many(self.metadata[b'items'], filter=filter, preload=preload):
  121. yield item
  122. def add_item(self, item):
  123. self.items_buffer.add(item)
  124. now = time.time()
  125. if now - self.last_checkpoint > self.checkpoint_interval:
  126. self.last_checkpoint = now
  127. self.write_checkpoint()
  128. if self.items_buffer.is_full():
  129. self.items_buffer.flush()
  130. def write_checkpoint(self):
  131. self.save(self.checkpoint_name)
  132. del self.manifest.archives[self.checkpoint_name]
  133. self.cache.chunk_decref(self.id)
  134. def save(self, name=None):
  135. name = name or self.name
  136. if name in self.manifest.archives:
  137. raise self.AlreadyExists(name)
  138. self.items_buffer.flush(flush=True)
  139. metadata = {
  140. 'version': 1,
  141. 'name': name,
  142. 'items': self.items_buffer.chunks,
  143. 'cmdline': sys.argv,
  144. 'hostname': socket.gethostname(),
  145. 'username': getuser(),
  146. 'time': datetime.utcnow().isoformat(),
  147. }
  148. data = msgpack.packb(metadata, unicode_errors='surrogateescape')
  149. self.id = self.key.id_hash(data)
  150. self.cache.add_chunk(self.id, data, self.stats)
  151. self.manifest.archives[name] = {'id': self.id, 'time': metadata['time']}
  152. self.manifest.write()
  153. self.repository.commit()
  154. self.cache.commit()
  155. def calc_stats(self, cache):
  156. def add(id):
  157. count, size, csize = self.cache.chunks[id]
  158. stats.update(size, csize, count == 1)
  159. self.cache.chunks[id] = count - 1, size, csize
  160. def add_file_chunks(chunks):
  161. for id, _, _ in chunks:
  162. add(id)
  163. # This function is a bit evil since it abuses the cache to calculate
  164. # the stats. The cache transaction must be rolled back afterwards
  165. unpacker = msgpack.Unpacker(use_list=False)
  166. cache.begin_txn()
  167. stats = Statistics()
  168. add(self.id)
  169. for id, chunk in zip_longest(self.metadata[b'items'], self.repository.get_many(self.metadata[b'items'])):
  170. add(id)
  171. unpacker.feed(self.key.decrypt(id, chunk))
  172. for item in unpacker:
  173. if b'chunks' in item:
  174. stats.nfiles += 1
  175. add_file_chunks(item[b'chunks'])
  176. cache.rollback()
  177. return stats
  178. def extract_item(self, item, restore_attrs=True):
  179. dest = self.cwd
  180. if item[b'path'].startswith('/') or item[b'path'].startswith('..'):
  181. raise Exception('Path should be relative and local')
  182. path = os.path.join(dest, item[b'path'])
  183. # Attempt to remove existing files, ignore errors on failure
  184. try:
  185. st = os.lstat(path)
  186. if stat.S_ISDIR(st.st_mode):
  187. os.rmdir(path)
  188. else:
  189. os.unlink(path)
  190. except OSError:
  191. pass
  192. mode = item[b'mode']
  193. if stat.S_ISDIR(mode):
  194. if not os.path.exists(path):
  195. os.makedirs(path)
  196. if restore_attrs:
  197. self.restore_attrs(path, item)
  198. elif stat.S_ISREG(mode):
  199. if not os.path.exists(os.path.dirname(path)):
  200. os.makedirs(os.path.dirname(path))
  201. # Hard link?
  202. if b'source' in item:
  203. source = os.path.join(dest, item[b'source'])
  204. if os.path.exists(path):
  205. os.unlink(path)
  206. os.link(source, path)
  207. else:
  208. with open(path, 'wb') as fd:
  209. ids = [c[0] for c in item[b'chunks']]
  210. for data in self.pipeline.fetch_many(ids, is_preloaded=True):
  211. fd.write(data)
  212. fd.flush()
  213. self.restore_attrs(path, item, fd=fd.fileno())
  214. elif stat.S_ISFIFO(mode):
  215. if not os.path.exists(os.path.dirname(path)):
  216. os.makedirs(os.path.dirname(path))
  217. os.mkfifo(path)
  218. self.restore_attrs(path, item)
  219. elif stat.S_ISLNK(mode):
  220. if not os.path.exists(os.path.dirname(path)):
  221. os.makedirs(os.path.dirname(path))
  222. source = item[b'source']
  223. if os.path.exists(path):
  224. os.unlink(path)
  225. os.symlink(source, path)
  226. self.restore_attrs(path, item, symlink=True)
  227. elif stat.S_ISCHR(mode) or stat.S_ISBLK(mode):
  228. os.mknod(path, item[b'mode'], item[b'rdev'])
  229. self.restore_attrs(path, item)
  230. else:
  231. raise Exception('Unknown archive item type %r' % item[b'mode'])
  232. def restore_attrs(self, path, item, symlink=False, fd=None):
  233. xattrs = item.get(b'xattrs')
  234. if xattrs:
  235. for k, v in xattrs.items():
  236. xattr.setxattr(fd or path, k, v)
  237. uid = gid = None
  238. if not self.numeric_owner:
  239. uid = user2uid(item[b'user'])
  240. gid = group2gid(item[b'group'])
  241. uid = uid or item[b'uid']
  242. gid = gid or item[b'gid']
  243. # This code is a bit of a mess due to os specific differences
  244. try:
  245. if fd:
  246. os.fchown(fd, uid, gid)
  247. else:
  248. os.lchown(path, uid, gid)
  249. except OSError:
  250. pass
  251. if fd:
  252. os.fchmod(fd, item[b'mode'])
  253. elif not symlink:
  254. os.chmod(path, item[b'mode'])
  255. elif has_lchmod: # Not available on Linux
  256. os.lchmod(path, item[b'mode'])
  257. if fd and utime_supports_fd: # Python >= 3.3
  258. os.utime(fd, None, ns=(item[b'mtime'], item[b'mtime']))
  259. elif utime_supports_fd: # Python >= 3.3
  260. os.utime(path, None, ns=(item[b'mtime'], item[b'mtime']), follow_symlinks=False)
  261. elif not symlink:
  262. os.utime(path, (item[b'mtime'] / 10**9, item[b'mtime'] / 10**9))
  263. def verify_file(self, item, start, result):
  264. if not item[b'chunks']:
  265. start(item)
  266. result(item, True)
  267. else:
  268. start(item)
  269. ids = [id for id, size, csize in item[b'chunks']]
  270. try:
  271. for _ in self.pipeline.fetch_many(ids, is_preloaded=True):
  272. pass
  273. except Exception:
  274. result(item, False)
  275. return
  276. result(item, True)
  277. def delete(self, cache):
  278. unpacker = msgpack.Unpacker(use_list=False)
  279. for id_, data in zip_longest(self.metadata[b'items'], self.repository.get_many(self.metadata[b'items'])):
  280. unpacker.feed(self.key.decrypt(id_, data))
  281. self.cache.chunk_decref(id_)
  282. for item in unpacker:
  283. if b'chunks' in item:
  284. for chunk_id, size, csize in item[b'chunks']:
  285. self.cache.chunk_decref(chunk_id)
  286. self.cache.chunk_decref(self.id)
  287. del self.manifest.archives[self.name]
  288. self.manifest.write()
  289. self.repository.commit()
  290. cache.commit()
  291. def stat_attrs(self, st, path):
  292. item = {
  293. b'mode': st.st_mode,
  294. b'uid': st.st_uid, b'user': uid2user(st.st_uid),
  295. b'gid': st.st_gid, b'group': gid2group(st.st_gid),
  296. b'mtime': st_mtime_ns(st),
  297. }
  298. if self.numeric_owner:
  299. item[b'user'] = item[b'group'] = None
  300. xattrs = xattr.get_all(path, follow_symlinks=False)
  301. if xattrs:
  302. item[b'xattrs'] = xattrs
  303. return item
  304. def process_item(self, path, st):
  305. item = {b'path': make_path_safe(path)}
  306. item.update(self.stat_attrs(st, path))
  307. self.add_item(item)
  308. def process_dev(self, path, st):
  309. item = {b'path': make_path_safe(path), b'rdev': st.st_rdev}
  310. item.update(self.stat_attrs(st, path))
  311. self.add_item(item)
  312. def process_symlink(self, path, st):
  313. source = os.readlink(path)
  314. item = {b'path': make_path_safe(path), b'source': source}
  315. item.update(self.stat_attrs(st, path))
  316. self.add_item(item)
  317. def process_file(self, path, st, cache):
  318. safe_path = make_path_safe(path)
  319. # Is it a hard link?
  320. if st.st_nlink > 1:
  321. source = self.hard_links.get((st.st_ino, st.st_dev))
  322. if (st.st_ino, st.st_dev) in self.hard_links:
  323. item = self.stat_attrs(st, path)
  324. item.update({b'path': safe_path, b'source': source})
  325. self.add_item(item)
  326. return
  327. else:
  328. self.hard_links[st.st_ino, st.st_dev] = safe_path
  329. path_hash = self.key.id_hash(os.path.join(self.cwd, path).encode('utf-8', 'surrogateescape'))
  330. ids = cache.file_known_and_unchanged(path_hash, st)
  331. chunks = None
  332. if ids is not None:
  333. # Make sure all ids are available
  334. for id_ in ids:
  335. if not cache.seen_chunk(id_):
  336. break
  337. else:
  338. chunks = [cache.chunk_incref(id_, self.stats) for id_ in ids]
  339. # Only chunkify the file if needed
  340. if chunks is None:
  341. with open(path, 'rb') as fd:
  342. chunks = []
  343. for chunk in chunkify(fd, WINDOW_SIZE, CHUNK_MASK, CHUNK_MIN, self.key.chunk_seed):
  344. chunks.append(cache.add_chunk(self.key.id_hash(chunk), chunk, self.stats))
  345. cache.memorize_file(path_hash, st, [c[0] for c in chunks])
  346. item = {b'path': safe_path, b'chunks': chunks}
  347. item.update(self.stat_attrs(st, path))
  348. self.stats.nfiles += 1
  349. self.add_item(item)
  350. @staticmethod
  351. def list_archives(repository, key, manifest, cache=None):
  352. for name, info in manifest.archives.items():
  353. yield Archive(repository, key, manifest, name, cache=cache)