cache.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422
  1. import configparser
  2. from .remote import cache_if_remote
  3. from collections import namedtuple
  4. import os
  5. import stat
  6. from binascii import hexlify, unhexlify
  7. import shutil
  8. from .key import PlaintextKey
  9. from .logger import create_logger
  10. logger = create_logger()
  11. from .helpers import Error, get_cache_dir, decode_dict, int_to_bigint, \
  12. bigint_to_int, format_file_size, yes
  13. from .locking import UpgradableLock
  14. from .hashindex import ChunkIndex
  15. import msgpack
  16. class Cache:
  17. """Client Side cache
  18. """
  19. class RepositoryReplay(Error):
  20. """Cache is newer than repository, refusing to continue"""
  21. class CacheInitAbortedError(Error):
  22. """Cache initialization aborted"""
  23. class RepositoryAccessAborted(Error):
  24. """Repository access aborted"""
  25. class EncryptionMethodMismatch(Error):
  26. """Repository encryption method changed since last access, refusing to continue"""
  27. @staticmethod
  28. def break_lock(repository, path=None):
  29. path = path or os.path.join(get_cache_dir(), hexlify(repository.id).decode('ascii'))
  30. UpgradableLock(os.path.join(path, 'lock'), exclusive=True).break_lock()
  31. @staticmethod
  32. def destroy(repository, path=None):
  33. """destroy the cache for ``repository`` or at ``path``"""
  34. path = path or os.path.join(get_cache_dir(), hexlify(repository.id).decode('ascii'))
  35. config = os.path.join(path, 'config')
  36. if os.path.exists(config):
  37. os.remove(config) # kill config first
  38. shutil.rmtree(path)
  39. def __init__(self, repository, key, manifest, path=None, sync=True, do_files=False, warn_if_unencrypted=True,
  40. lock_wait=None):
  41. self.lock = None
  42. self.timestamp = None
  43. self.lock = None
  44. self.txn_active = False
  45. self.repository = repository
  46. self.key = key
  47. self.manifest = manifest
  48. self.path = path or os.path.join(get_cache_dir(), hexlify(repository.id).decode('ascii'))
  49. self.do_files = do_files
  50. # Warn user before sending data to a never seen before unencrypted repository
  51. if not os.path.exists(self.path):
  52. if warn_if_unencrypted and isinstance(key, PlaintextKey):
  53. msg = ("Warning: Attempting to access a previously unknown unencrypted repository!" +
  54. "\n" +
  55. "Do you want to continue? [yN] ")
  56. if not yes(msg, false_msg="Aborting.", env_var_override='BORG_UNKNOWN_UNENCRYPTED_REPO_ACCESS_IS_OK'):
  57. raise self.CacheInitAbortedError()
  58. self.create()
  59. self.open(lock_wait=lock_wait)
  60. try:
  61. # Warn user before sending data to a relocated repository
  62. if self.previous_location and self.previous_location != repository._location.canonical_path():
  63. msg = ("Warning: The repository at location {} was previously located at {}".format(repository._location.canonical_path(), self.previous_location) +
  64. "\n" +
  65. "Do you want to continue? [yN] ")
  66. if not yes(msg, false_msg="Aborting.", env_var_override='BORG_RELOCATED_REPO_ACCESS_IS_OK'):
  67. raise self.RepositoryAccessAborted()
  68. if sync and self.manifest.id != self.manifest_id:
  69. # If repository is older than the cache something fishy is going on
  70. if self.timestamp and self.timestamp > manifest.timestamp:
  71. raise self.RepositoryReplay()
  72. # Make sure an encrypted repository has not been swapped for an unencrypted repository
  73. if self.key_type is not None and self.key_type != str(key.TYPE):
  74. raise self.EncryptionMethodMismatch()
  75. self.sync()
  76. self.commit()
  77. except:
  78. self.close()
  79. raise
  80. def __enter__(self):
  81. return self
  82. def __exit__(self, exc_type, exc_val, exc_tb):
  83. self.close()
  84. def __str__(self):
  85. fmt = """\
  86. All archives: {0.total_size:>20s} {0.total_csize:>20s} {0.unique_csize:>20s}
  87. Unique chunks Total chunks
  88. Chunk index: {0.total_unique_chunks:20d} {0.total_chunks:20d}"""
  89. return fmt.format(self.format_tuple())
  90. def format_tuple(self):
  91. # XXX: this should really be moved down to `hashindex.pyx`
  92. Summary = namedtuple('Summary', ['total_size', 'total_csize', 'unique_size', 'unique_csize', 'total_unique_chunks', 'total_chunks'])
  93. stats = Summary(*self.chunks.summarize())._asdict()
  94. for field in ['total_size', 'total_csize', 'unique_csize']:
  95. stats[field] = format_file_size(stats[field])
  96. return Summary(**stats)
  97. def create(self):
  98. """Create a new empty cache at `self.path`
  99. """
  100. os.makedirs(self.path)
  101. with open(os.path.join(self.path, 'README'), 'w') as fd:
  102. fd.write('This is a Borg cache')
  103. config = configparser.ConfigParser(interpolation=None)
  104. config.add_section('cache')
  105. config.set('cache', 'version', '1')
  106. config.set('cache', 'repository', hexlify(self.repository.id).decode('ascii'))
  107. config.set('cache', 'manifest', '')
  108. with open(os.path.join(self.path, 'config'), 'w') as fd:
  109. config.write(fd)
  110. ChunkIndex().write(os.path.join(self.path, 'chunks').encode('utf-8'))
  111. os.makedirs(os.path.join(self.path, 'chunks.archive.d'))
  112. with open(os.path.join(self.path, 'files'), 'wb') as fd:
  113. pass # empty file
  114. def _do_open(self):
  115. self.config = configparser.ConfigParser(interpolation=None)
  116. config_path = os.path.join(self.path, 'config')
  117. self.config.read(config_path)
  118. try:
  119. cache_version = self.config.getint('cache', 'version')
  120. wanted_version = 1
  121. if cache_version != wanted_version:
  122. raise Exception('%s has unexpected cache version %d (wanted: %d).' % (
  123. config_path, cache_version, wanted_version))
  124. except configparser.NoSectionError:
  125. raise Exception('%s does not look like a Borg cache.' % config_path) from None
  126. self.id = self.config.get('cache', 'repository')
  127. self.manifest_id = unhexlify(self.config.get('cache', 'manifest'))
  128. self.timestamp = self.config.get('cache', 'timestamp', fallback=None)
  129. self.key_type = self.config.get('cache', 'key_type', fallback=None)
  130. self.previous_location = self.config.get('cache', 'previous_location', fallback=None)
  131. self.chunks = ChunkIndex.read(os.path.join(self.path, 'chunks').encode('utf-8'))
  132. self.files = None
  133. def open(self, lock_wait=None):
  134. if not os.path.isdir(self.path):
  135. raise Exception('%s Does not look like a Borg cache' % self.path)
  136. self.lock = UpgradableLock(os.path.join(self.path, 'lock'), exclusive=True, timeout=lock_wait).acquire()
  137. self.rollback()
  138. def close(self):
  139. if self.lock is not None:
  140. self.lock.release()
  141. self.lock = None
  142. def _read_files(self):
  143. self.files = {}
  144. self._newest_mtime = 0
  145. logger.debug('Reading files cache ...')
  146. with open(os.path.join(self.path, 'files'), 'rb') as fd:
  147. u = msgpack.Unpacker(use_list=True)
  148. while True:
  149. data = fd.read(64 * 1024)
  150. if not data:
  151. break
  152. u.feed(data)
  153. for path_hash, item in u:
  154. item[0] += 1
  155. # in the end, this takes about 240 Bytes per file
  156. self.files[path_hash] = msgpack.packb(item)
  157. def begin_txn(self):
  158. # Initialize transaction snapshot
  159. txn_dir = os.path.join(self.path, 'txn.tmp')
  160. os.mkdir(txn_dir)
  161. shutil.copy(os.path.join(self.path, 'config'), txn_dir)
  162. shutil.copy(os.path.join(self.path, 'chunks'), txn_dir)
  163. shutil.copy(os.path.join(self.path, 'files'), txn_dir)
  164. os.rename(os.path.join(self.path, 'txn.tmp'),
  165. os.path.join(self.path, 'txn.active'))
  166. self.txn_active = True
  167. def commit(self):
  168. """Commit transaction
  169. """
  170. if not self.txn_active:
  171. return
  172. if self.files is not None:
  173. ttl = int(os.environ.get('BORG_FILES_CACHE_TTL', 20))
  174. with open(os.path.join(self.path, 'files'), 'wb') as fd:
  175. for path_hash, item in self.files.items():
  176. # Discard cached files with the newest mtime to avoid
  177. # issues with filesystem snapshots and mtime precision
  178. item = msgpack.unpackb(item)
  179. if item[0] < ttl and bigint_to_int(item[3]) < self._newest_mtime:
  180. msgpack.pack((path_hash, item), fd)
  181. self.config.set('cache', 'manifest', hexlify(self.manifest.id).decode('ascii'))
  182. self.config.set('cache', 'timestamp', self.manifest.timestamp)
  183. self.config.set('cache', 'key_type', str(self.key.TYPE))
  184. self.config.set('cache', 'previous_location', self.repository._location.canonical_path())
  185. with open(os.path.join(self.path, 'config'), 'w') as fd:
  186. self.config.write(fd)
  187. self.chunks.write(os.path.join(self.path, 'chunks').encode('utf-8'))
  188. os.rename(os.path.join(self.path, 'txn.active'),
  189. os.path.join(self.path, 'txn.tmp'))
  190. shutil.rmtree(os.path.join(self.path, 'txn.tmp'))
  191. self.txn_active = False
  192. def rollback(self):
  193. """Roll back partial and aborted transactions
  194. """
  195. # Remove partial transaction
  196. if os.path.exists(os.path.join(self.path, 'txn.tmp')):
  197. shutil.rmtree(os.path.join(self.path, 'txn.tmp'))
  198. # Roll back active transaction
  199. txn_dir = os.path.join(self.path, 'txn.active')
  200. if os.path.exists(txn_dir):
  201. shutil.copy(os.path.join(txn_dir, 'config'), self.path)
  202. shutil.copy(os.path.join(txn_dir, 'chunks'), self.path)
  203. shutil.copy(os.path.join(txn_dir, 'files'), self.path)
  204. os.rename(txn_dir, os.path.join(self.path, 'txn.tmp'))
  205. if os.path.exists(os.path.join(self.path, 'txn.tmp')):
  206. shutil.rmtree(os.path.join(self.path, 'txn.tmp'))
  207. self.txn_active = False
  208. self._do_open()
  209. def sync(self):
  210. """Re-synchronize chunks cache with repository.
  211. Maintains a directory with known backup archive indexes, so it only
  212. needs to fetch infos from repo and build a chunk index once per backup
  213. archive.
  214. If out of sync, missing archive indexes get added, outdated indexes
  215. get removed and a new master chunks index is built by merging all
  216. archive indexes.
  217. """
  218. archive_path = os.path.join(self.path, 'chunks.archive.d')
  219. def mkpath(id, suffix=''):
  220. id_hex = hexlify(id).decode('ascii')
  221. path = os.path.join(archive_path, id_hex + suffix)
  222. return path.encode('utf-8')
  223. def cached_archives():
  224. if self.do_cache:
  225. fns = os.listdir(archive_path)
  226. # filenames with 64 hex digits == 256bit
  227. return set(unhexlify(fn) for fn in fns if len(fn) == 64)
  228. else:
  229. return set()
  230. def repo_archives():
  231. return set(info[b'id'] for info in self.manifest.archives.values())
  232. def cleanup_outdated(ids):
  233. for id in ids:
  234. os.unlink(mkpath(id))
  235. def fetch_and_build_idx(archive_id, repository, key):
  236. chunk_idx = ChunkIndex()
  237. cdata = repository.get(archive_id)
  238. data = key.decrypt(archive_id, cdata)
  239. chunk_idx.add(archive_id, 1, len(data), len(cdata))
  240. archive = msgpack.unpackb(data)
  241. if archive[b'version'] != 1:
  242. raise Exception('Unknown archive metadata version')
  243. decode_dict(archive, (b'name',))
  244. unpacker = msgpack.Unpacker()
  245. for item_id, chunk in zip(archive[b'items'], repository.get_many(archive[b'items'])):
  246. data = key.decrypt(item_id, chunk)
  247. chunk_idx.add(item_id, 1, len(data), len(chunk))
  248. unpacker.feed(data)
  249. for item in unpacker:
  250. if not isinstance(item, dict):
  251. logger.error('Error: Did not get expected metadata dict - archive corrupted!')
  252. continue
  253. if b'chunks' in item:
  254. for chunk_id, size, csize in item[b'chunks']:
  255. chunk_idx.add(chunk_id, 1, size, csize)
  256. if self.do_cache:
  257. fn = mkpath(archive_id)
  258. fn_tmp = mkpath(archive_id, suffix='.tmp')
  259. try:
  260. chunk_idx.write(fn_tmp)
  261. except Exception:
  262. os.unlink(fn_tmp)
  263. else:
  264. os.rename(fn_tmp, fn)
  265. return chunk_idx
  266. def lookup_name(archive_id):
  267. for name, info in self.manifest.archives.items():
  268. if info[b'id'] == archive_id:
  269. return name
  270. def create_master_idx(chunk_idx):
  271. logger.info('Synchronizing chunks cache...')
  272. cached_ids = cached_archives()
  273. archive_ids = repo_archives()
  274. logger.info('Archives: %d, w/ cached Idx: %d, w/ outdated Idx: %d, w/o cached Idx: %d.' % (
  275. len(archive_ids), len(cached_ids),
  276. len(cached_ids - archive_ids), len(archive_ids - cached_ids), ))
  277. # deallocates old hashindex, creates empty hashindex:
  278. chunk_idx.clear()
  279. cleanup_outdated(cached_ids - archive_ids)
  280. if archive_ids:
  281. chunk_idx = None
  282. for archive_id in archive_ids:
  283. archive_name = lookup_name(archive_id)
  284. if archive_id in cached_ids:
  285. archive_chunk_idx_path = mkpath(archive_id)
  286. logger.info("Reading cached archive chunk index for %s ..." % archive_name)
  287. archive_chunk_idx = ChunkIndex.read(archive_chunk_idx_path)
  288. else:
  289. logger.info('Fetching and building archive index for %s ...' % archive_name)
  290. archive_chunk_idx = fetch_and_build_idx(archive_id, repository, self.key)
  291. logger.info("Merging into master chunks index ...")
  292. if chunk_idx is None:
  293. # we just use the first archive's idx as starting point,
  294. # to avoid growing the hash table from 0 size and also
  295. # to save 1 merge call.
  296. chunk_idx = archive_chunk_idx
  297. else:
  298. chunk_idx.merge(archive_chunk_idx)
  299. logger.info('Done.')
  300. return chunk_idx
  301. def legacy_cleanup():
  302. """bring old cache dirs into the desired state (cleanup and adapt)"""
  303. try:
  304. os.unlink(os.path.join(self.path, 'chunks.archive'))
  305. except:
  306. pass
  307. try:
  308. os.unlink(os.path.join(self.path, 'chunks.archive.tmp'))
  309. except:
  310. pass
  311. try:
  312. os.mkdir(archive_path)
  313. except:
  314. pass
  315. self.begin_txn()
  316. with cache_if_remote(self.repository) as repository:
  317. legacy_cleanup()
  318. # TEMPORARY HACK: to avoid archive index caching, create a FILE named ~/.cache/borg/REPOID/chunks.archive.d -
  319. # this is only recommended if you have a fast, low latency connection to your repo (e.g. if repo is local disk)
  320. self.do_cache = os.path.isdir(archive_path)
  321. self.chunks = create_master_idx(self.chunks)
  322. def add_chunk(self, id, data, stats):
  323. if not self.txn_active:
  324. self.begin_txn()
  325. size = len(data)
  326. if self.seen_chunk(id, size):
  327. return self.chunk_incref(id, stats)
  328. data = self.key.encrypt(data)
  329. csize = len(data)
  330. self.repository.put(id, data, wait=False)
  331. self.chunks[id] = (1, size, csize)
  332. stats.update(size, csize, True)
  333. return id, size, csize
  334. def seen_chunk(self, id, size=None):
  335. refcount, stored_size, _ = self.chunks.get(id, (0, None, None))
  336. if size is not None and stored_size is not None and size != stored_size:
  337. # we already have a chunk with that id, but different size.
  338. # this is either a hash collision (unlikely) or corruption or a bug.
  339. raise Exception("chunk has same id [%r], but different size (stored: %d new: %d)!" % (
  340. id, stored_size, size))
  341. return refcount
  342. def chunk_incref(self, id, stats):
  343. if not self.txn_active:
  344. self.begin_txn()
  345. count, size, csize = self.chunks.incref(id)
  346. stats.update(size, csize, False)
  347. return id, size, csize
  348. def chunk_decref(self, id, stats):
  349. if not self.txn_active:
  350. self.begin_txn()
  351. count, size, csize = self.chunks.decref(id)
  352. if count == 0:
  353. del self.chunks[id]
  354. self.repository.delete(id, wait=False)
  355. stats.update(-size, -csize, True)
  356. else:
  357. stats.update(-size, -csize, False)
  358. def file_known_and_unchanged(self, path_hash, st, ignore_inode=False):
  359. if not (self.do_files and stat.S_ISREG(st.st_mode)):
  360. return None
  361. if self.files is None:
  362. self._read_files()
  363. entry = self.files.get(path_hash)
  364. if not entry:
  365. return None
  366. entry = msgpack.unpackb(entry)
  367. if (entry[2] == st.st_size and bigint_to_int(entry[3]) == st.st_mtime_ns and
  368. (ignore_inode or entry[1] == st.st_ino)):
  369. # reset entry age
  370. entry[0] = 0
  371. self.files[path_hash] = msgpack.packb(entry)
  372. return entry[4]
  373. else:
  374. return None
  375. def memorize_file(self, path_hash, st, ids):
  376. if not (self.do_files and stat.S_ISREG(st.st_mode)):
  377. return
  378. # Entry: Age, inode, size, mtime, chunk ids
  379. mtime_ns = st.st_mtime_ns
  380. self.files[path_hash] = msgpack.packb((0, st.st_ino, st.st_size, int_to_bigint(mtime_ns), ids))
  381. self._newest_mtime = max(self._newest_mtime, mtime_ns)