cache.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420
  1. import configparser
  2. from .remote import cache_if_remote
  3. from collections import namedtuple
  4. import os
  5. import stat
  6. from binascii import hexlify, unhexlify
  7. import shutil
  8. from .key import PlaintextKey
  9. from .logger import create_logger
  10. logger = create_logger()
  11. from .helpers import Error, get_cache_dir, decode_dict, int_to_bigint, \
  12. bigint_to_int, format_file_size, yes
  13. from .locking import UpgradableLock
  14. from .hashindex import ChunkIndex
  15. import msgpack
  16. class Cache:
  17. """Client Side cache
  18. """
  19. class RepositoryReplay(Error):
  20. """Cache is newer than repository, refusing to continue"""
  21. class CacheInitAbortedError(Error):
  22. """Cache initialization aborted"""
  23. class RepositoryAccessAborted(Error):
  24. """Repository access aborted"""
  25. class EncryptionMethodMismatch(Error):
  26. """Repository encryption method changed since last access, refusing to continue"""
  27. @staticmethod
  28. def break_lock(repository, path=None):
  29. path = path or os.path.join(get_cache_dir(), hexlify(repository.id).decode('ascii'))
  30. UpgradableLock(os.path.join(path, 'lock'), exclusive=True).break_lock()
  31. def __init__(self, repository, key, manifest, path=None, sync=True, do_files=False, warn_if_unencrypted=True,
  32. lock_wait=None):
  33. self.lock = None
  34. self.timestamp = None
  35. self.lock = None
  36. self.txn_active = False
  37. self.repository = repository
  38. self.key = key
  39. self.manifest = manifest
  40. self.path = path or os.path.join(get_cache_dir(), hexlify(repository.id).decode('ascii'))
  41. self.do_files = do_files
  42. # Warn user before sending data to a never seen before unencrypted repository
  43. if not os.path.exists(self.path):
  44. if warn_if_unencrypted and isinstance(key, PlaintextKey):
  45. msg = ("Warning: Attempting to access a previously unknown unencrypted repository!" +
  46. "\n" +
  47. "Do you want to continue? [yN] ")
  48. if not yes(msg, false_msg="Aborting.", env_var_override='BORG_UNKNOWN_UNENCRYPTED_REPO_ACCESS_IS_OK'):
  49. raise self.CacheInitAbortedError()
  50. self.create()
  51. self.open(lock_wait=lock_wait)
  52. try:
  53. # Warn user before sending data to a relocated repository
  54. if self.previous_location and self.previous_location != repository._location.canonical_path():
  55. msg = ("Warning: The repository at location {} was previously located at {}".format(repository._location.canonical_path(), self.previous_location) +
  56. "\n" +
  57. "Do you want to continue? [yN] ")
  58. if not yes(msg, false_msg="Aborting.", env_var_override='BORG_RELOCATED_REPO_ACCESS_IS_OK'):
  59. raise self.RepositoryAccessAborted()
  60. if sync and self.manifest.id != self.manifest_id:
  61. # If repository is older than the cache something fishy is going on
  62. if self.timestamp and self.timestamp > manifest.timestamp:
  63. raise self.RepositoryReplay()
  64. # Make sure an encrypted repository has not been swapped for an unencrypted repository
  65. if self.key_type is not None and self.key_type != str(key.TYPE):
  66. raise self.EncryptionMethodMismatch()
  67. self.sync()
  68. self.commit()
  69. except:
  70. self.close()
  71. raise
  72. def __enter__(self):
  73. return self
  74. def __exit__(self, exc_type, exc_val, exc_tb):
  75. self.close()
  76. def __str__(self):
  77. fmt = """\
  78. All archives: {0.total_size:>20s} {0.total_csize:>20s} {0.unique_csize:>20s}
  79. Unique chunks Total chunks
  80. Chunk index: {0.total_unique_chunks:20d} {0.total_chunks:20d}"""
  81. return fmt.format(self.format_tuple())
  82. def format_tuple(self):
  83. # XXX: this should really be moved down to `hashindex.pyx`
  84. Summary = namedtuple('Summary', ['total_size', 'total_csize', 'unique_size', 'unique_csize', 'total_unique_chunks', 'total_chunks'])
  85. stats = Summary(*self.chunks.summarize())._asdict()
  86. for field in ['total_size', 'total_csize', 'unique_csize']:
  87. stats[field] = format_file_size(stats[field])
  88. return Summary(**stats)
  89. def create(self):
  90. """Create a new empty cache at `self.path`
  91. """
  92. os.makedirs(self.path)
  93. with open(os.path.join(self.path, 'README'), 'w') as fd:
  94. fd.write('This is a Borg cache')
  95. config = configparser.ConfigParser(interpolation=None)
  96. config.add_section('cache')
  97. config.set('cache', 'version', '1')
  98. config.set('cache', 'repository', hexlify(self.repository.id).decode('ascii'))
  99. config.set('cache', 'manifest', '')
  100. with open(os.path.join(self.path, 'config'), 'w') as fd:
  101. config.write(fd)
  102. ChunkIndex().write(os.path.join(self.path, 'chunks').encode('utf-8'))
  103. os.makedirs(os.path.join(self.path, 'chunks.archive.d'))
  104. with open(os.path.join(self.path, 'files'), 'wb') as fd:
  105. pass # empty file
  106. def destroy(self):
  107. """destroy the cache at `self.path`
  108. """
  109. self.close()
  110. os.remove(os.path.join(self.path, 'config')) # kill config first
  111. shutil.rmtree(self.path)
  112. def _do_open(self):
  113. self.config = configparser.ConfigParser(interpolation=None)
  114. config_path = os.path.join(self.path, 'config')
  115. self.config.read(config_path)
  116. try:
  117. cache_version = self.config.getint('cache', 'version')
  118. wanted_version = 1
  119. if cache_version != wanted_version:
  120. raise Exception('%s has unexpected cache version %d (wanted: %d).' % (
  121. config_path, cache_version, wanted_version))
  122. except configparser.NoSectionError:
  123. raise Exception('%s does not look like a Borg cache.' % config_path) from None
  124. self.id = self.config.get('cache', 'repository')
  125. self.manifest_id = unhexlify(self.config.get('cache', 'manifest'))
  126. self.timestamp = self.config.get('cache', 'timestamp', fallback=None)
  127. self.key_type = self.config.get('cache', 'key_type', fallback=None)
  128. self.previous_location = self.config.get('cache', 'previous_location', fallback=None)
  129. self.chunks = ChunkIndex.read(os.path.join(self.path, 'chunks').encode('utf-8'))
  130. self.files = None
  131. def open(self, lock_wait=None):
  132. if not os.path.isdir(self.path):
  133. raise Exception('%s Does not look like a Borg cache' % self.path)
  134. self.lock = UpgradableLock(os.path.join(self.path, 'lock'), exclusive=True, timeout=lock_wait).acquire()
  135. self.rollback()
  136. def close(self):
  137. if self.lock is not None:
  138. self.lock.release()
  139. self.lock = None
  140. def _read_files(self):
  141. self.files = {}
  142. self._newest_mtime = 0
  143. logger.info('reading files cache')
  144. with open(os.path.join(self.path, 'files'), 'rb') as fd:
  145. u = msgpack.Unpacker(use_list=True)
  146. while True:
  147. data = fd.read(64 * 1024)
  148. if not data:
  149. break
  150. u.feed(data)
  151. for path_hash, item in u:
  152. item[0] += 1
  153. # in the end, this takes about 240 Bytes per file
  154. self.files[path_hash] = msgpack.packb(item)
  155. def begin_txn(self):
  156. # Initialize transaction snapshot
  157. txn_dir = os.path.join(self.path, 'txn.tmp')
  158. os.mkdir(txn_dir)
  159. shutil.copy(os.path.join(self.path, 'config'), txn_dir)
  160. shutil.copy(os.path.join(self.path, 'chunks'), txn_dir)
  161. shutil.copy(os.path.join(self.path, 'files'), txn_dir)
  162. os.rename(os.path.join(self.path, 'txn.tmp'),
  163. os.path.join(self.path, 'txn.active'))
  164. self.txn_active = True
  165. def commit(self):
  166. """Commit transaction
  167. """
  168. if not self.txn_active:
  169. return
  170. if self.files is not None:
  171. with open(os.path.join(self.path, 'files'), 'wb') as fd:
  172. for path_hash, item in self.files.items():
  173. # Discard cached files with the newest mtime to avoid
  174. # issues with filesystem snapshots and mtime precision
  175. item = msgpack.unpackb(item)
  176. if item[0] < 10 and bigint_to_int(item[3]) < self._newest_mtime:
  177. msgpack.pack((path_hash, item), fd)
  178. self.config.set('cache', 'manifest', hexlify(self.manifest.id).decode('ascii'))
  179. self.config.set('cache', 'timestamp', self.manifest.timestamp)
  180. self.config.set('cache', 'key_type', str(self.key.TYPE))
  181. self.config.set('cache', 'previous_location', self.repository._location.canonical_path())
  182. with open(os.path.join(self.path, 'config'), 'w') as fd:
  183. self.config.write(fd)
  184. self.chunks.write(os.path.join(self.path, 'chunks').encode('utf-8'))
  185. os.rename(os.path.join(self.path, 'txn.active'),
  186. os.path.join(self.path, 'txn.tmp'))
  187. shutil.rmtree(os.path.join(self.path, 'txn.tmp'))
  188. self.txn_active = False
  189. def rollback(self):
  190. """Roll back partial and aborted transactions
  191. """
  192. # Remove partial transaction
  193. if os.path.exists(os.path.join(self.path, 'txn.tmp')):
  194. shutil.rmtree(os.path.join(self.path, 'txn.tmp'))
  195. # Roll back active transaction
  196. txn_dir = os.path.join(self.path, 'txn.active')
  197. if os.path.exists(txn_dir):
  198. shutil.copy(os.path.join(txn_dir, 'config'), self.path)
  199. shutil.copy(os.path.join(txn_dir, 'chunks'), self.path)
  200. shutil.copy(os.path.join(txn_dir, 'files'), self.path)
  201. os.rename(txn_dir, os.path.join(self.path, 'txn.tmp'))
  202. if os.path.exists(os.path.join(self.path, 'txn.tmp')):
  203. shutil.rmtree(os.path.join(self.path, 'txn.tmp'))
  204. self.txn_active = False
  205. self._do_open()
  206. def sync(self):
  207. """Re-synchronize chunks cache with repository.
  208. Maintains a directory with known backup archive indexes, so it only
  209. needs to fetch infos from repo and build a chunk index once per backup
  210. archive.
  211. If out of sync, missing archive indexes get added, outdated indexes
  212. get removed and a new master chunks index is built by merging all
  213. archive indexes.
  214. """
  215. archive_path = os.path.join(self.path, 'chunks.archive.d')
  216. def mkpath(id, suffix=''):
  217. id_hex = hexlify(id).decode('ascii')
  218. path = os.path.join(archive_path, id_hex + suffix)
  219. return path.encode('utf-8')
  220. def cached_archives():
  221. if self.do_cache:
  222. fns = os.listdir(archive_path)
  223. # filenames with 64 hex digits == 256bit
  224. return set(unhexlify(fn) for fn in fns if len(fn) == 64)
  225. else:
  226. return set()
  227. def repo_archives():
  228. return set(info[b'id'] for info in self.manifest.archives.values())
  229. def cleanup_outdated(ids):
  230. for id in ids:
  231. os.unlink(mkpath(id))
  232. def fetch_and_build_idx(archive_id, repository, key):
  233. chunk_idx = ChunkIndex()
  234. cdata = repository.get(archive_id)
  235. data = key.decrypt(archive_id, cdata)
  236. chunk_idx.add(archive_id, 1, len(data), len(cdata))
  237. archive = msgpack.unpackb(data)
  238. if archive[b'version'] != 1:
  239. raise Exception('Unknown archive metadata version')
  240. decode_dict(archive, (b'name',))
  241. unpacker = msgpack.Unpacker()
  242. for item_id, chunk in zip(archive[b'items'], repository.get_many(archive[b'items'])):
  243. data = key.decrypt(item_id, chunk)
  244. chunk_idx.add(item_id, 1, len(data), len(chunk))
  245. unpacker.feed(data)
  246. for item in unpacker:
  247. if not isinstance(item, dict):
  248. logger.error('Error: Did not get expected metadata dict - archive corrupted!')
  249. continue
  250. if b'chunks' in item:
  251. for chunk_id, size, csize in item[b'chunks']:
  252. chunk_idx.add(chunk_id, 1, size, csize)
  253. if self.do_cache:
  254. fn = mkpath(archive_id)
  255. fn_tmp = mkpath(archive_id, suffix='.tmp')
  256. try:
  257. chunk_idx.write(fn_tmp)
  258. except Exception:
  259. os.unlink(fn_tmp)
  260. else:
  261. os.rename(fn_tmp, fn)
  262. return chunk_idx
  263. def lookup_name(archive_id):
  264. for name, info in self.manifest.archives.items():
  265. if info[b'id'] == archive_id:
  266. return name
  267. def create_master_idx(chunk_idx):
  268. logger.info('Synchronizing chunks cache...')
  269. cached_ids = cached_archives()
  270. archive_ids = repo_archives()
  271. logger.info('Archives: %d, w/ cached Idx: %d, w/ outdated Idx: %d, w/o cached Idx: %d.' % (
  272. len(archive_ids), len(cached_ids),
  273. len(cached_ids - archive_ids), len(archive_ids - cached_ids), ))
  274. # deallocates old hashindex, creates empty hashindex:
  275. chunk_idx.clear()
  276. cleanup_outdated(cached_ids - archive_ids)
  277. if archive_ids:
  278. chunk_idx = None
  279. for archive_id in archive_ids:
  280. archive_name = lookup_name(archive_id)
  281. if archive_id in cached_ids:
  282. archive_chunk_idx_path = mkpath(archive_id)
  283. logger.info("Reading cached archive chunk index for %s ..." % archive_name)
  284. archive_chunk_idx = ChunkIndex.read(archive_chunk_idx_path)
  285. else:
  286. logger.info('Fetching and building archive index for %s ...' % archive_name)
  287. archive_chunk_idx = fetch_and_build_idx(archive_id, repository, self.key)
  288. logger.info("Merging into master chunks index ...")
  289. if chunk_idx is None:
  290. # we just use the first archive's idx as starting point,
  291. # to avoid growing the hash table from 0 size and also
  292. # to save 1 merge call.
  293. chunk_idx = archive_chunk_idx
  294. else:
  295. chunk_idx.merge(archive_chunk_idx)
  296. logger.info('Done.')
  297. return chunk_idx
  298. def legacy_cleanup():
  299. """bring old cache dirs into the desired state (cleanup and adapt)"""
  300. try:
  301. os.unlink(os.path.join(self.path, 'chunks.archive'))
  302. except:
  303. pass
  304. try:
  305. os.unlink(os.path.join(self.path, 'chunks.archive.tmp'))
  306. except:
  307. pass
  308. try:
  309. os.mkdir(archive_path)
  310. except:
  311. pass
  312. self.begin_txn()
  313. with cache_if_remote(self.repository) as repository:
  314. legacy_cleanup()
  315. # TEMPORARY HACK: to avoid archive index caching, create a FILE named ~/.cache/borg/REPOID/chunks.archive.d -
  316. # this is only recommended if you have a fast, low latency connection to your repo (e.g. if repo is local disk)
  317. self.do_cache = os.path.isdir(archive_path)
  318. self.chunks = create_master_idx(self.chunks)
  319. def add_chunk(self, id, data, stats):
  320. if not self.txn_active:
  321. self.begin_txn()
  322. size = len(data)
  323. if self.seen_chunk(id, size):
  324. return self.chunk_incref(id, stats)
  325. data = self.key.encrypt(data)
  326. csize = len(data)
  327. self.repository.put(id, data, wait=False)
  328. self.chunks[id] = (1, size, csize)
  329. stats.update(size, csize, True)
  330. return id, size, csize
  331. def seen_chunk(self, id, size=None):
  332. refcount, stored_size, _ = self.chunks.get(id, (0, None, None))
  333. if size is not None and stored_size is not None and size != stored_size:
  334. # we already have a chunk with that id, but different size.
  335. # this is either a hash collision (unlikely) or corruption or a bug.
  336. raise Exception("chunk has same id [%r], but different size (stored: %d new: %d)!" % (
  337. id, stored_size, size))
  338. return refcount
  339. def chunk_incref(self, id, stats):
  340. if not self.txn_active:
  341. self.begin_txn()
  342. count, size, csize = self.chunks[id]
  343. self.chunks[id] = (count + 1, size, csize)
  344. stats.update(size, csize, False)
  345. return id, size, csize
  346. def chunk_decref(self, id, stats):
  347. if not self.txn_active:
  348. self.begin_txn()
  349. count, size, csize = self.chunks[id]
  350. if count == 1:
  351. del self.chunks[id]
  352. self.repository.delete(id, wait=False)
  353. stats.update(-size, -csize, True)
  354. else:
  355. self.chunks[id] = (count - 1, size, csize)
  356. stats.update(-size, -csize, False)
  357. def file_known_and_unchanged(self, path_hash, st):
  358. if not (self.do_files and stat.S_ISREG(st.st_mode)):
  359. return None
  360. if self.files is None:
  361. self._read_files()
  362. entry = self.files.get(path_hash)
  363. if not entry:
  364. return None
  365. entry = msgpack.unpackb(entry)
  366. if entry[2] == st.st_size and bigint_to_int(entry[3]) == st.st_mtime_ns and entry[1] == st.st_ino:
  367. # reset entry age
  368. entry[0] = 0
  369. self.files[path_hash] = msgpack.packb(entry)
  370. return entry[4]
  371. else:
  372. return None
  373. def memorize_file(self, path_hash, st, ids):
  374. if not (self.do_files and stat.S_ISREG(st.st_mode)):
  375. return
  376. # Entry: Age, inode, size, mtime, chunk ids
  377. mtime_ns = st.st_mtime_ns
  378. self.files[path_hash] = msgpack.packb((0, st.st_ino, st.st_size, int_to_bigint(mtime_ns), ids))
  379. self._newest_mtime = max(self._newest_mtime, mtime_ns)