cache.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491
  1. from configparser import RawConfigParser
  2. from .remote import cache_if_remote
  3. import errno
  4. import msgpack
  5. import os
  6. import sys
  7. import threading
  8. from binascii import hexlify
  9. import shutil
  10. import tarfile
  11. import tempfile
  12. from .key import PlaintextKey
  13. from .helpers import Error, get_cache_dir, decode_dict, st_mtime_ns, unhexlify, int_to_bigint, \
  14. bigint_to_int
  15. from .locking import UpgradableLock
  16. from .hashindex import ChunkIndex
  17. class Cache:
  18. """Client Side cache
  19. """
  20. class RepositoryReplay(Error):
  21. """Cache is newer than repository, refusing to continue"""
  22. class CacheInitAbortedError(Error):
  23. """Cache initialization aborted"""
  24. class RepositoryAccessAborted(Error):
  25. """Repository access aborted"""
  26. class EncryptionMethodMismatch(Error):
  27. """Repository encryption method changed since last acccess, refusing to continue
  28. """
  29. class ChunkSizeNotReady(Exception):
  30. """computation of some chunk size is not yet finished"""
  31. def __init__(self, repository, key, manifest, path=None, sync=True, do_files=False, warn_if_unencrypted=True):
  32. self.lock = None
  33. self.timestamp = None
  34. self.thread_lock = threading.Lock()
  35. self.lock = None
  36. self.txn_active = False
  37. self.repository = repository
  38. self.key = key
  39. self.manifest = manifest
  40. self.path = path or os.path.join(get_cache_dir(), hexlify(repository.id).decode('ascii'))
  41. self.do_files = do_files
  42. # Warn user before sending data to a never seen before unencrypted repository
  43. if not os.path.exists(self.path):
  44. if warn_if_unencrypted and isinstance(key, PlaintextKey):
  45. if not self._confirm('Warning: Attempting to access a previously unknown unencrypted repository',
  46. 'BORG_UNKNOWN_UNENCRYPTED_REPO_ACCESS_IS_OK'):
  47. raise self.CacheInitAbortedError()
  48. self.create()
  49. self.open()
  50. # Warn user before sending data to a relocated repository
  51. if self.previous_location and self.previous_location != repository._location.canonical_path():
  52. msg = 'Warning: The repository at location {} was previously located at {}'.format(repository._location.canonical_path(), self.previous_location)
  53. if not self._confirm(msg, 'BORG_RELOCATED_REPO_ACCESS_IS_OK'):
  54. raise self.RepositoryAccessAborted()
  55. if sync and self.manifest.id != self.manifest_id:
  56. # If repository is older than the cache something fishy is going on
  57. if self.timestamp and self.timestamp > manifest.timestamp:
  58. raise self.RepositoryReplay()
  59. # Make sure an encrypted repository has not been swapped for an unencrypted repository
  60. if self.key_type is not None and self.key_type != str(key.TYPE):
  61. raise self.EncryptionMethodMismatch()
  62. self.sync()
  63. self.commit()
  64. def __del__(self):
  65. self.close()
  66. def _confirm(self, message, env_var_override=None):
  67. print(message, file=sys.stderr)
  68. if env_var_override and os.environ.get(env_var_override):
  69. print("Yes (From {})".format(env_var_override))
  70. return True
  71. if not sys.stdin.isatty():
  72. return False
  73. try:
  74. answer = input('Do you want to continue? [yN] ')
  75. except EOFError:
  76. return False
  77. return answer and answer in 'Yy'
  78. def create(self):
  79. """Create a new empty cache at `self.path`
  80. """
  81. os.makedirs(self.path)
  82. with open(os.path.join(self.path, 'README'), 'w') as fd:
  83. fd.write('This is a Borg cache')
  84. config = RawConfigParser()
  85. config.add_section('cache')
  86. config.set('cache', 'version', '1')
  87. config.set('cache', 'repository', hexlify(self.repository.id).decode('ascii'))
  88. config.set('cache', 'manifest', '')
  89. with open(os.path.join(self.path, 'config'), 'w') as fd:
  90. config.write(fd)
  91. ChunkIndex().write(os.path.join(self.path, 'chunks').encode('utf-8'))
  92. with open(os.path.join(self.path, 'chunks.archive'), 'wb') as fd:
  93. pass # empty file
  94. with open(os.path.join(self.path, 'files'), 'wb') as fd:
  95. pass # empty file
  96. def destroy(self):
  97. """destroy the cache at `self.path`
  98. """
  99. self.close()
  100. os.remove(os.path.join(self.path, 'config')) # kill config first
  101. shutil.rmtree(self.path)
  102. def _do_open(self):
  103. self.config = RawConfigParser()
  104. self.config.read(os.path.join(self.path, 'config'))
  105. if self.config.getint('cache', 'version') != 1:
  106. raise Exception('%s Does not look like a Borg cache')
  107. self.id = self.config.get('cache', 'repository')
  108. self.manifest_id = unhexlify(self.config.get('cache', 'manifest'))
  109. self.timestamp = self.config.get('cache', 'timestamp', fallback=None)
  110. self.key_type = self.config.get('cache', 'key_type', fallback=None)
  111. self.previous_location = self.config.get('cache', 'previous_location', fallback=None)
  112. self.chunks = ChunkIndex.read(os.path.join(self.path, 'chunks').encode('utf-8'))
  113. self.files = None
  114. def open(self):
  115. if not os.path.isdir(self.path):
  116. raise Exception('%s Does not look like a Borg cache' % self.path)
  117. self.lock = UpgradableLock(os.path.join(self.path, 'lock'), exclusive=True).acquire()
  118. self.rollback()
  119. def close(self):
  120. if self.lock:
  121. self.lock.release()
  122. self.lock = None
  123. def _read_files(self):
  124. self.files = {}
  125. self._newest_mtime = 0
  126. with open(os.path.join(self.path, 'files'), 'rb') as fd:
  127. u = msgpack.Unpacker(use_list=True)
  128. while True:
  129. data = fd.read(64 * 1024)
  130. if not data:
  131. break
  132. u.feed(data)
  133. for path_hash, item in u:
  134. item[0] += 1
  135. # in the end, this takes about 240 Bytes per file
  136. self.files[path_hash] = msgpack.packb(item)
  137. def begin_txn(self):
  138. # Initialize transaction snapshot
  139. txn_dir = os.path.join(self.path, 'txn.tmp')
  140. os.mkdir(txn_dir)
  141. shutil.copy(os.path.join(self.path, 'config'), txn_dir)
  142. shutil.copy(os.path.join(self.path, 'chunks'), txn_dir)
  143. shutil.copy(os.path.join(self.path, 'chunks.archive'), txn_dir)
  144. shutil.copy(os.path.join(self.path, 'files'), txn_dir)
  145. os.rename(os.path.join(self.path, 'txn.tmp'),
  146. os.path.join(self.path, 'txn.active'))
  147. self.txn_active = True
  148. def commit(self):
  149. """Commit transaction
  150. """
  151. if not self.txn_active:
  152. return
  153. if self.files is not None:
  154. with open(os.path.join(self.path, 'files'), 'wb') as fd:
  155. for path_hash, item in self.files.items():
  156. # Discard cached files with the newest mtime to avoid
  157. # issues with filesystem snapshots and mtime precision
  158. item = msgpack.unpackb(item)
  159. if item[0] < 10 and bigint_to_int(item[3]) < self._newest_mtime:
  160. msgpack.pack((path_hash, item), fd)
  161. self.config.set('cache', 'manifest', hexlify(self.manifest.id).decode('ascii'))
  162. self.config.set('cache', 'timestamp', self.manifest.timestamp)
  163. self.config.set('cache', 'key_type', str(self.key.TYPE))
  164. self.config.set('cache', 'previous_location', self.repository._location.canonical_path())
  165. with open(os.path.join(self.path, 'config'), 'w') as fd:
  166. self.config.write(fd)
  167. self.chunks.write(os.path.join(self.path, 'chunks').encode('utf-8'))
  168. os.rename(os.path.join(self.path, 'txn.active'),
  169. os.path.join(self.path, 'txn.tmp'))
  170. shutil.rmtree(os.path.join(self.path, 'txn.tmp'))
  171. self.txn_active = False
  172. def rollback(self):
  173. """Roll back partial and aborted transactions
  174. """
  175. # Remove partial transaction
  176. if os.path.exists(os.path.join(self.path, 'txn.tmp')):
  177. shutil.rmtree(os.path.join(self.path, 'txn.tmp'))
  178. # Roll back active transaction
  179. txn_dir = os.path.join(self.path, 'txn.active')
  180. if os.path.exists(txn_dir):
  181. shutil.copy(os.path.join(txn_dir, 'config'), self.path)
  182. shutil.copy(os.path.join(txn_dir, 'chunks'), self.path)
  183. shutil.copy(os.path.join(txn_dir, 'chunks.archive'), self.path)
  184. shutil.copy(os.path.join(txn_dir, 'files'), self.path)
  185. os.rename(txn_dir, os.path.join(self.path, 'txn.tmp'))
  186. if os.path.exists(os.path.join(self.path, 'txn.tmp')):
  187. shutil.rmtree(os.path.join(self.path, 'txn.tmp'))
  188. self.txn_active = False
  189. self._do_open()
  190. def sync(self):
  191. """Re-synchronize chunks cache with repository.
  192. If present, uses a compressed tar archive of known backup archive
  193. indices, so it only needs to fetch infos from repo and build a chunk
  194. index once per backup archive.
  195. If out of sync, the tar gets rebuilt from known + fetched chunk infos,
  196. so it has complete and current information about all backup archives.
  197. Finally, it builds the master chunks index by merging all indices from
  198. the tar.
  199. Note: compression (esp. xz) is very effective in keeping the tar
  200. relatively small compared to the files it contains.
  201. """
  202. in_archive_path = os.path.join(self.path, 'chunks.archive')
  203. out_archive_path = os.path.join(self.path, 'chunks.archive.tmp')
  204. def open_in_archive():
  205. try:
  206. tf = tarfile.open(in_archive_path, 'r')
  207. except OSError as e:
  208. if e.errno != errno.ENOENT:
  209. raise
  210. # file not found
  211. tf = None
  212. except tarfile.ReadError:
  213. # empty file?
  214. tf = None
  215. return tf
  216. def open_out_archive():
  217. for compression in ('xz', 'bz2', 'gz'):
  218. # xz needs py 3.3, bz2 and gz also work on 3.2
  219. try:
  220. tf = tarfile.open(out_archive_path, 'w:'+compression, format=tarfile.PAX_FORMAT)
  221. break
  222. except tarfile.CompressionError:
  223. continue
  224. else: # shouldn't happen
  225. tf = None
  226. return tf
  227. def close_archive(tf):
  228. if tf:
  229. tf.close()
  230. def delete_in_archive():
  231. os.unlink(in_archive_path)
  232. def rename_out_archive():
  233. os.rename(out_archive_path, in_archive_path)
  234. def add(chunk_idx, id, size, csize, incr=1):
  235. try:
  236. count, size, csize = chunk_idx[id]
  237. chunk_idx[id] = count + incr, size, csize
  238. except KeyError:
  239. chunk_idx[id] = incr, size, csize
  240. def transfer_known_idx(archive_id, tf_in, tf_out):
  241. archive_id_hex = hexlify(archive_id).decode('ascii')
  242. tarinfo = tf_in.getmember(archive_id_hex)
  243. archive_name = tarinfo.pax_headers['archive_name']
  244. print('Already known archive:', archive_name)
  245. f_in = tf_in.extractfile(archive_id_hex)
  246. tf_out.addfile(tarinfo, f_in)
  247. return archive_name
  248. def fetch_and_build_idx(archive_id, repository, key, tmp_dir, tf_out):
  249. chunk_idx = ChunkIndex()
  250. cdata = repository.get(archive_id)
  251. data = key.decrypt(archive_id, cdata)
  252. add(chunk_idx, archive_id, len(data), len(cdata))
  253. archive = msgpack.unpackb(data)
  254. if archive[b'version'] != 1:
  255. raise Exception('Unknown archive metadata version')
  256. decode_dict(archive, (b'name',))
  257. print('Analyzing new archive:', archive[b'name'])
  258. unpacker = msgpack.Unpacker()
  259. for item_id, chunk in zip(archive[b'items'], repository.get_many(archive[b'items'])):
  260. data = key.decrypt(item_id, chunk)
  261. add(chunk_idx, item_id, len(data), len(chunk))
  262. unpacker.feed(data)
  263. for item in unpacker:
  264. if not isinstance(item, dict):
  265. print('Error: Did not get expected metadata dict - archive corrupted!')
  266. continue
  267. if b'chunks' in item:
  268. for chunk_id, size, csize in item[b'chunks']:
  269. add(chunk_idx, chunk_id, size, csize)
  270. archive_id_hex = hexlify(archive_id).decode('ascii')
  271. file_tmp = os.path.join(tmp_dir, archive_id_hex).encode('utf-8')
  272. chunk_idx.write(file_tmp)
  273. tarinfo = tf_out.gettarinfo(file_tmp, archive_id_hex)
  274. tarinfo.pax_headers['archive_name'] = archive[b'name']
  275. with open(file_tmp, 'rb') as f:
  276. tf_out.addfile(tarinfo, f)
  277. os.unlink(file_tmp)
  278. def create_master_idx(chunk_idx, tf_in, tmp_dir):
  279. chunk_idx.clear()
  280. for tarinfo in tf_in:
  281. archive_id_hex = tarinfo.name
  282. archive_name = tarinfo.pax_headers['archive_name']
  283. print("- extracting archive %s ..." % archive_name)
  284. tf_in.extract(archive_id_hex, tmp_dir)
  285. chunk_idx_path = os.path.join(tmp_dir, archive_id_hex).encode('utf-8')
  286. print("- reading archive ...")
  287. archive_chunk_idx = ChunkIndex.read(chunk_idx_path)
  288. print("- merging archive ...")
  289. chunk_idx.merge(archive_chunk_idx)
  290. os.unlink(chunk_idx_path)
  291. self.begin_txn()
  292. print('Synchronizing chunks cache...')
  293. # XXX we have to do stuff on disk due to lacking ChunkIndex api
  294. with tempfile.TemporaryDirectory(prefix='borg-tmp') as tmp_dir:
  295. repository = cache_if_remote(self.repository)
  296. out_archive = open_out_archive()
  297. in_archive = open_in_archive()
  298. if in_archive:
  299. known_ids = set(unhexlify(hexid) for hexid in in_archive.getnames())
  300. else:
  301. known_ids = set()
  302. archive_ids = set(info[b'id'] for info in self.manifest.archives.values())
  303. print('Rebuilding archive collection. Known: %d Repo: %d Unknown: %d' % (
  304. len(known_ids), len(archive_ids), len(archive_ids - known_ids), ))
  305. for archive_id in archive_ids & known_ids:
  306. transfer_known_idx(archive_id, in_archive, out_archive)
  307. close_archive(in_archive)
  308. delete_in_archive() # free disk space
  309. for archive_id in archive_ids - known_ids:
  310. fetch_and_build_idx(archive_id, repository, self.key, tmp_dir, out_archive)
  311. close_archive(out_archive)
  312. rename_out_archive()
  313. print('Merging collection into master chunks cache...')
  314. in_archive = open_in_archive()
  315. create_master_idx(self.chunks, in_archive, tmp_dir)
  316. close_archive(in_archive)
  317. print('Done.')
  318. def add_chunk(self, id, data, stats):
  319. if not self.txn_active:
  320. self.begin_txn()
  321. if self.seen_chunk(id):
  322. return self.chunk_incref(id, stats)
  323. size = len(data)
  324. data = self.key.encrypt(data)
  325. csize = len(data)
  326. self.repository.put(id, data, wait=False)
  327. self.chunks[id] = (1, size, csize)
  328. stats.update(size, csize, True)
  329. return id, size, csize
  330. def chunk_modify(self, id, count=None, delta=None, size=None, csize=None):
  331. """modify a self.chunks entry, return the new value.
  332. must be thread safe.
  333. """
  334. with self.thread_lock:
  335. _count, _size, _csize = self.chunks[id]
  336. modified = False
  337. if size is not None and size != _size:
  338. assert _size == 0
  339. _size = size
  340. modified = True
  341. if csize is not None and csize != _csize:
  342. assert _csize == 0
  343. _csize = csize
  344. modified = True
  345. if count is not None and count != _count:
  346. assert _count == 0
  347. _count = count
  348. modified = True
  349. if delta is not None and delta != 0:
  350. _count += delta
  351. assert _count >= 0
  352. modified = True
  353. if modified:
  354. self.chunks[id] = _count, _size, _csize
  355. return _count, _size, _csize
  356. def add_chunk_nostats(self, cchunk, id, size, csize):
  357. # do not update stats here, see postprocess
  358. if not self.txn_active:
  359. self.begin_txn()
  360. new_chunk = cchunk is not None
  361. if new_chunk:
  362. # note: count = 1 already set in seen_or_announce_chunk
  363. _, size, csize = self.chunk_modify(id, size=size, csize=csize)
  364. self.repository.put(id, cchunk, wait=False)
  365. else:
  366. # note: csize might be still 0 (not yet computed) here
  367. _, size, csize = self.chunk_modify(id, delta=1, size=size)
  368. return size, csize, new_chunk
  369. def postprocess_results(self, size_infos, results, stats):
  370. # we need to do some post processing:
  371. # - chunks that are duplicate may have csize not yet set correctly due
  372. # to the multi threaded processing. all (x, 0) sizes must be still
  373. # set using the correct size from the other duplicate chunk (not x, 0).
  374. # - we need to reconstruct the correct order of the chunks.
  375. # - we need to fix the stats now we have the correct csize
  376. chunks = []
  377. for _, id, new_chunk in sorted(results):
  378. try:
  379. size, csize = size_infos[id]
  380. except KeyError:
  381. raise self.ChunkSizeNotReady
  382. chunks.append((id, size, csize, new_chunk))
  383. # do another pass after we have made sure we have all size info
  384. results = []
  385. for id, size, csize, new_chunk in chunks:
  386. stats.update(size, csize, new_chunk)
  387. results.append((id, size, csize))
  388. return results
  389. def seen_chunk(self, id):
  390. return self.chunks.get(id, (0, 0, 0))[0]
  391. def seen_or_announce_chunk(self, id, size):
  392. """return True if we have seen the chunk <id> already (thus, we already have it or will have it soon).
  393. in case we don't have seen it, announce its (future) availability, return False.
  394. must be thread safe.
  395. """
  396. with self.thread_lock:
  397. try:
  398. # did we see this id already (and is count > 0)?
  399. count, _size, _csize = self.chunks[id]
  400. assert size == _size
  401. return count > 0
  402. except KeyError:
  403. # announce that we will put this chunk soon,
  404. # so that deduplication knows we already have it.
  405. self.chunks[id] = 1, size, 0
  406. return False
  407. def chunk_incref(self, id, stats):
  408. if not self.txn_active:
  409. self.begin_txn()
  410. count, size, csize = self.chunks[id]
  411. self.chunks[id] = (count + 1, size, csize)
  412. stats.update(size, csize, False)
  413. return id, size, csize
  414. def chunk_decref(self, id, stats):
  415. if not self.txn_active:
  416. self.begin_txn()
  417. count, size, csize = self.chunks[id]
  418. if count == 1:
  419. del self.chunks[id]
  420. self.repository.delete(id, wait=False)
  421. stats.update(-size, -csize, True)
  422. else:
  423. self.chunks[id] = (count - 1, size, csize)
  424. stats.update(-size, -csize, False)
  425. def file_known_and_unchanged(self, path_hash, st):
  426. if not self.do_files:
  427. return None
  428. if self.files is None:
  429. self._read_files()
  430. entry = self.files.get(path_hash)
  431. if not entry:
  432. return None
  433. entry = msgpack.unpackb(entry)
  434. if entry[2] == st.st_size and bigint_to_int(entry[3]) == st_mtime_ns(st) and entry[1] == st.st_ino:
  435. # reset entry age
  436. entry[0] = 0
  437. self.files[path_hash] = msgpack.packb(entry)
  438. return entry[4]
  439. else:
  440. return None
  441. def memorize_file(self, path_hash, st, ids):
  442. if not self.do_files:
  443. return
  444. # Entry: Age, inode, size, mtime, chunk ids
  445. mtime_ns = st_mtime_ns(st)
  446. self.files[path_hash] = msgpack.packb((0, st.st_ino, st.st_size, int_to_bigint(mtime_ns), ids))
  447. self._newest_mtime = max(self._newest_mtime, mtime_ns)