cache.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406
  1. from configparser import RawConfigParser
  2. from .remote import cache_if_remote
  3. import errno
  4. import msgpack
  5. import os
  6. import sys
  7. from binascii import hexlify
  8. import shutil
  9. import tarfile
  10. import tempfile
  11. from .key import PlaintextKey
  12. from .helpers import Error, get_cache_dir, decode_dict, st_mtime_ns, unhexlify, int_to_bigint, \
  13. bigint_to_int
  14. from .locking import UpgradableLock
  15. from .hashindex import ChunkIndex
  16. class Cache:
  17. """Client Side cache
  18. """
  19. class RepositoryReplay(Error):
  20. """Cache is newer than repository, refusing to continue"""
  21. class CacheInitAbortedError(Error):
  22. """Cache initialization aborted"""
  23. class RepositoryAccessAborted(Error):
  24. """Repository access aborted"""
  25. class EncryptionMethodMismatch(Error):
  26. """Repository encryption method changed since last acccess, refusing to continue
  27. """
  28. def __init__(self, repository, key, manifest, path=None, sync=True, do_files=False, warn_if_unencrypted=True):
  29. self.lock = None
  30. self.timestamp = None
  31. self.lock = None
  32. self.txn_active = False
  33. self.repository = repository
  34. self.key = key
  35. self.manifest = manifest
  36. self.path = path or os.path.join(get_cache_dir(), hexlify(repository.id).decode('ascii'))
  37. self.do_files = do_files
  38. # Warn user before sending data to a never seen before unencrypted repository
  39. if not os.path.exists(self.path):
  40. if warn_if_unencrypted and isinstance(key, PlaintextKey):
  41. if not self._confirm('Warning: Attempting to access a previously unknown unencrypted repository',
  42. 'BORG_UNKNOWN_UNENCRYPTED_REPO_ACCESS_IS_OK'):
  43. raise self.CacheInitAbortedError()
  44. self.create()
  45. self.open()
  46. # Warn user before sending data to a relocated repository
  47. if self.previous_location and self.previous_location != repository._location.canonical_path():
  48. msg = 'Warning: The repository at location {} was previously located at {}'.format(repository._location.canonical_path(), self.previous_location)
  49. if not self._confirm(msg, 'BORG_RELOCATED_REPO_ACCESS_IS_OK'):
  50. raise self.RepositoryAccessAborted()
  51. if sync and self.manifest.id != self.manifest_id:
  52. # If repository is older than the cache something fishy is going on
  53. if self.timestamp and self.timestamp > manifest.timestamp:
  54. raise self.RepositoryReplay()
  55. # Make sure an encrypted repository has not been swapped for an unencrypted repository
  56. if self.key_type is not None and self.key_type != str(key.TYPE):
  57. raise self.EncryptionMethodMismatch()
  58. self.sync()
  59. self.commit()
  60. def __del__(self):
  61. self.close()
  62. def _confirm(self, message, env_var_override=None):
  63. print(message, file=sys.stderr)
  64. if env_var_override and os.environ.get(env_var_override):
  65. print("Yes (From {})".format(env_var_override))
  66. return True
  67. if not sys.stdin.isatty():
  68. return False
  69. try:
  70. answer = input('Do you want to continue? [yN] ')
  71. except EOFError:
  72. return False
  73. return answer and answer in 'Yy'
  74. def create(self):
  75. """Create a new empty cache at `self.path`
  76. """
  77. os.makedirs(self.path)
  78. with open(os.path.join(self.path, 'README'), 'w') as fd:
  79. fd.write('This is a Borg cache')
  80. config = RawConfigParser()
  81. config.add_section('cache')
  82. config.set('cache', 'version', '1')
  83. config.set('cache', 'repository', hexlify(self.repository.id).decode('ascii'))
  84. config.set('cache', 'manifest', '')
  85. with open(os.path.join(self.path, 'config'), 'w') as fd:
  86. config.write(fd)
  87. ChunkIndex().write(os.path.join(self.path, 'chunks').encode('utf-8'))
  88. with open(os.path.join(self.path, 'chunks.archive'), 'wb') as fd:
  89. pass # empty file
  90. with open(os.path.join(self.path, 'files'), 'wb') as fd:
  91. pass # empty file
  92. def destroy(self):
  93. """destroy the cache at `self.path`
  94. """
  95. self.close()
  96. os.remove(os.path.join(self.path, 'config')) # kill config first
  97. shutil.rmtree(self.path)
  98. def _do_open(self):
  99. self.config = RawConfigParser()
  100. self.config.read(os.path.join(self.path, 'config'))
  101. if self.config.getint('cache', 'version') != 1:
  102. raise Exception('%s Does not look like a Borg cache')
  103. self.id = self.config.get('cache', 'repository')
  104. self.manifest_id = unhexlify(self.config.get('cache', 'manifest'))
  105. self.timestamp = self.config.get('cache', 'timestamp', fallback=None)
  106. self.key_type = self.config.get('cache', 'key_type', fallback=None)
  107. self.previous_location = self.config.get('cache', 'previous_location', fallback=None)
  108. self.chunks = ChunkIndex.read(os.path.join(self.path, 'chunks').encode('utf-8'))
  109. self.files = None
  110. def open(self):
  111. if not os.path.isdir(self.path):
  112. raise Exception('%s Does not look like a Borg cache' % self.path)
  113. self.lock = UpgradableLock(os.path.join(self.path, 'lock'), exclusive=True).acquire()
  114. self.rollback()
  115. def close(self):
  116. if self.lock:
  117. self.lock.release()
  118. self.lock = None
  119. def _read_files(self):
  120. self.files = {}
  121. self._newest_mtime = 0
  122. with open(os.path.join(self.path, 'files'), 'rb') as fd:
  123. u = msgpack.Unpacker(use_list=True)
  124. while True:
  125. data = fd.read(64 * 1024)
  126. if not data:
  127. break
  128. u.feed(data)
  129. for path_hash, item in u:
  130. item[0] += 1
  131. # in the end, this takes about 240 Bytes per file
  132. self.files[path_hash] = msgpack.packb(item)
  133. def begin_txn(self):
  134. # Initialize transaction snapshot
  135. txn_dir = os.path.join(self.path, 'txn.tmp')
  136. os.mkdir(txn_dir)
  137. shutil.copy(os.path.join(self.path, 'config'), txn_dir)
  138. shutil.copy(os.path.join(self.path, 'chunks'), txn_dir)
  139. shutil.copy(os.path.join(self.path, 'chunks.archive'), txn_dir)
  140. shutil.copy(os.path.join(self.path, 'files'), txn_dir)
  141. os.rename(os.path.join(self.path, 'txn.tmp'),
  142. os.path.join(self.path, 'txn.active'))
  143. self.txn_active = True
  144. def commit(self):
  145. """Commit transaction
  146. """
  147. if not self.txn_active:
  148. return
  149. if self.files is not None:
  150. with open(os.path.join(self.path, 'files'), 'wb') as fd:
  151. for path_hash, item in self.files.items():
  152. # Discard cached files with the newest mtime to avoid
  153. # issues with filesystem snapshots and mtime precision
  154. item = msgpack.unpackb(item)
  155. if item[0] < 10 and bigint_to_int(item[3]) < self._newest_mtime:
  156. msgpack.pack((path_hash, item), fd)
  157. self.config.set('cache', 'manifest', hexlify(self.manifest.id).decode('ascii'))
  158. self.config.set('cache', 'timestamp', self.manifest.timestamp)
  159. self.config.set('cache', 'key_type', str(self.key.TYPE))
  160. self.config.set('cache', 'previous_location', self.repository._location.canonical_path())
  161. with open(os.path.join(self.path, 'config'), 'w') as fd:
  162. self.config.write(fd)
  163. self.chunks.write(os.path.join(self.path, 'chunks').encode('utf-8'))
  164. os.rename(os.path.join(self.path, 'txn.active'),
  165. os.path.join(self.path, 'txn.tmp'))
  166. shutil.rmtree(os.path.join(self.path, 'txn.tmp'))
  167. self.txn_active = False
  168. def rollback(self):
  169. """Roll back partial and aborted transactions
  170. """
  171. # Remove partial transaction
  172. if os.path.exists(os.path.join(self.path, 'txn.tmp')):
  173. shutil.rmtree(os.path.join(self.path, 'txn.tmp'))
  174. # Roll back active transaction
  175. txn_dir = os.path.join(self.path, 'txn.active')
  176. if os.path.exists(txn_dir):
  177. shutil.copy(os.path.join(txn_dir, 'config'), self.path)
  178. shutil.copy(os.path.join(txn_dir, 'chunks'), self.path)
  179. shutil.copy(os.path.join(txn_dir, 'chunks.archive'), self.path)
  180. shutil.copy(os.path.join(txn_dir, 'files'), self.path)
  181. os.rename(txn_dir, os.path.join(self.path, 'txn.tmp'))
  182. if os.path.exists(os.path.join(self.path, 'txn.tmp')):
  183. shutil.rmtree(os.path.join(self.path, 'txn.tmp'))
  184. self.txn_active = False
  185. self._do_open()
  186. def sync(self):
  187. """Re-synchronize chunks cache with repository.
  188. If present, uses a compressed tar archive of known backup archive
  189. indices, so it only needs to fetch infos from repo and build a chunk
  190. index once per backup archive.
  191. If out of sync, the tar gets rebuilt from known + fetched chunk infos,
  192. so it has complete and current information about all backup archives.
  193. Finally, it builds the master chunks index by merging all indices from
  194. the tar.
  195. Note: compression (esp. xz) is very effective in keeping the tar
  196. relatively small compared to the files it contains.
  197. """
  198. in_archive_path = os.path.join(self.path, 'chunks.archive')
  199. out_archive_path = os.path.join(self.path, 'chunks.archive.tmp')
  200. def open_in_archive():
  201. try:
  202. tf = tarfile.open(in_archive_path, 'r')
  203. except OSError as e:
  204. if e.errno != errno.ENOENT:
  205. raise
  206. # file not found
  207. tf = None
  208. except tarfile.ReadError:
  209. # empty file?
  210. tf = None
  211. return tf
  212. def open_out_archive():
  213. for compression in ('xz', 'bz2', 'gz'):
  214. # xz needs py 3.3, bz2 and gz also work on 3.2
  215. try:
  216. tf = tarfile.open(out_archive_path, 'w:'+compression, format=tarfile.PAX_FORMAT)
  217. break
  218. except tarfile.CompressionError:
  219. continue
  220. else: # shouldn't happen
  221. tf = None
  222. return tf
  223. def close_archive(tf):
  224. if tf:
  225. tf.close()
  226. def delete_in_archive():
  227. os.unlink(in_archive_path)
  228. def rename_out_archive():
  229. os.rename(out_archive_path, in_archive_path)
  230. def add(chunk_idx, id, size, csize, incr=1):
  231. try:
  232. count, size, csize = chunk_idx[id]
  233. chunk_idx[id] = count + incr, size, csize
  234. except KeyError:
  235. chunk_idx[id] = incr, size, csize
  236. def transfer_known_idx(archive_id, tf_in, tf_out):
  237. archive_id_hex = hexlify(archive_id).decode('ascii')
  238. tarinfo = tf_in.getmember(archive_id_hex)
  239. archive_name = tarinfo.pax_headers['archive_name']
  240. print('Already known archive:', archive_name)
  241. f_in = tf_in.extractfile(archive_id_hex)
  242. tf_out.addfile(tarinfo, f_in)
  243. return archive_name
  244. def fetch_and_build_idx(archive_id, repository, key, tmp_dir, tf_out):
  245. chunk_idx = ChunkIndex()
  246. cdata = repository.get(archive_id)
  247. data = key.decrypt(archive_id, cdata)
  248. add(chunk_idx, archive_id, len(data), len(cdata))
  249. archive = msgpack.unpackb(data)
  250. if archive[b'version'] != 1:
  251. raise Exception('Unknown archive metadata version')
  252. decode_dict(archive, (b'name',))
  253. print('Analyzing new archive:', archive[b'name'])
  254. unpacker = msgpack.Unpacker()
  255. for item_id, chunk in zip(archive[b'items'], repository.get_many(archive[b'items'])):
  256. data = key.decrypt(item_id, chunk)
  257. add(chunk_idx, item_id, len(data), len(chunk))
  258. unpacker.feed(data)
  259. for item in unpacker:
  260. if not isinstance(item, dict):
  261. print('Error: Did not get expected metadata dict - archive corrupted!')
  262. continue
  263. if b'chunks' in item:
  264. for chunk_id, size, csize in item[b'chunks']:
  265. add(chunk_idx, chunk_id, size, csize)
  266. archive_id_hex = hexlify(archive_id).decode('ascii')
  267. file_tmp = os.path.join(tmp_dir, archive_id_hex).encode('utf-8')
  268. chunk_idx.write(file_tmp)
  269. tarinfo = tf_out.gettarinfo(file_tmp, archive_id_hex)
  270. tarinfo.pax_headers['archive_name'] = archive[b'name']
  271. with open(file_tmp, 'rb') as f:
  272. tf_out.addfile(tarinfo, f)
  273. os.unlink(file_tmp)
  274. def create_master_idx(chunk_idx, tf_in, tmp_dir):
  275. chunk_idx.clear()
  276. for tarinfo in tf_in:
  277. archive_id_hex = tarinfo.name
  278. archive_name = tarinfo.pax_headers['archive_name']
  279. print("- extracting archive %s ..." % archive_name)
  280. tf_in.extract(archive_id_hex, tmp_dir)
  281. chunk_idx_path = os.path.join(tmp_dir, archive_id_hex).encode('utf-8')
  282. print("- reading archive ...")
  283. archive_chunk_idx = ChunkIndex.read(chunk_idx_path)
  284. print("- merging archive ...")
  285. chunk_idx.merge(archive_chunk_idx)
  286. os.unlink(chunk_idx_path)
  287. self.begin_txn()
  288. print('Synchronizing chunks cache...')
  289. # XXX we have to do stuff on disk due to lacking ChunkIndex api
  290. with tempfile.TemporaryDirectory(prefix='borg-tmp') as tmp_dir:
  291. repository = cache_if_remote(self.repository)
  292. out_archive = open_out_archive()
  293. in_archive = open_in_archive()
  294. if in_archive:
  295. known_ids = set(unhexlify(hexid) for hexid in in_archive.getnames())
  296. else:
  297. known_ids = set()
  298. archive_ids = set(info[b'id'] for info in self.manifest.archives.values())
  299. print('Rebuilding archive collection. Known: %d Repo: %d Unknown: %d' % (
  300. len(known_ids), len(archive_ids), len(archive_ids - known_ids), ))
  301. for archive_id in archive_ids & known_ids:
  302. transfer_known_idx(archive_id, in_archive, out_archive)
  303. close_archive(in_archive)
  304. delete_in_archive() # free disk space
  305. for archive_id in archive_ids - known_ids:
  306. fetch_and_build_idx(archive_id, repository, self.key, tmp_dir, out_archive)
  307. close_archive(out_archive)
  308. rename_out_archive()
  309. print('Merging collection into master chunks cache...')
  310. in_archive = open_in_archive()
  311. create_master_idx(self.chunks, in_archive, tmp_dir)
  312. close_archive(in_archive)
  313. print('Done.')
  314. def add_chunk(self, id, data, stats):
  315. if not self.txn_active:
  316. self.begin_txn()
  317. if self.seen_chunk(id):
  318. return self.chunk_incref(id, stats)
  319. size = len(data)
  320. data = self.key.encrypt(data)
  321. csize = len(data)
  322. self.repository.put(id, data, wait=False)
  323. self.chunks[id] = (1, size, csize)
  324. stats.update(size, csize, True)
  325. return id, size, csize
  326. def seen_chunk(self, id):
  327. return self.chunks.get(id, (0, 0, 0))[0]
  328. def chunk_incref(self, id, stats):
  329. if not self.txn_active:
  330. self.begin_txn()
  331. count, size, csize = self.chunks[id]
  332. self.chunks[id] = (count + 1, size, csize)
  333. stats.update(size, csize, False)
  334. return id, size, csize
  335. def chunk_decref(self, id, stats):
  336. if not self.txn_active:
  337. self.begin_txn()
  338. count, size, csize = self.chunks[id]
  339. if count == 1:
  340. del self.chunks[id]
  341. self.repository.delete(id, wait=False)
  342. stats.update(-size, -csize, True)
  343. else:
  344. self.chunks[id] = (count - 1, size, csize)
  345. stats.update(-size, -csize, False)
  346. def file_known_and_unchanged(self, path_hash, st):
  347. if not self.do_files:
  348. return None
  349. if self.files is None:
  350. self._read_files()
  351. entry = self.files.get(path_hash)
  352. if not entry:
  353. return None
  354. entry = msgpack.unpackb(entry)
  355. if entry[2] == st.st_size and bigint_to_int(entry[3]) == st_mtime_ns(st) and entry[1] == st.st_ino:
  356. # reset entry age
  357. entry[0] = 0
  358. self.files[path_hash] = msgpack.packb(entry)
  359. return entry[4]
  360. else:
  361. return None
  362. def memorize_file(self, path_hash, st, ids):
  363. if not self.do_files:
  364. return
  365. # Entry: Age, inode, size, mtime, chunk ids
  366. mtime_ns = st_mtime_ns(st)
  367. self.files[path_hash] = msgpack.packb((0, st.st_ino, st.st_size, int_to_bigint(mtime_ns), ids))
  368. self._newest_mtime = max(self._newest_mtime, mtime_ns)