cache.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407
  1. import configparser
  2. from .remote import cache_if_remote
  3. import errno
  4. import msgpack
  5. import os
  6. import stat
  7. import sys
  8. from binascii import hexlify
  9. import shutil
  10. import tarfile
  11. import tempfile
  12. from .key import PlaintextKey
  13. from .helpers import Error, get_cache_dir, decode_dict, st_mtime_ns, unhexlify, int_to_bigint, \
  14. bigint_to_int
  15. from .locking import UpgradableLock
  16. from .hashindex import ChunkIndex
  17. class Cache:
  18. """Client Side cache
  19. """
  20. class RepositoryReplay(Error):
  21. """Cache is newer than repository, refusing to continue"""
  22. class CacheInitAbortedError(Error):
  23. """Cache initialization aborted"""
  24. class RepositoryAccessAborted(Error):
  25. """Repository access aborted"""
  26. class EncryptionMethodMismatch(Error):
  27. """Repository encryption method changed since last acccess, refusing to continue
  28. """
  29. def __init__(self, repository, key, manifest, path=None, sync=True, do_files=False, warn_if_unencrypted=True):
  30. self.lock = None
  31. self.timestamp = None
  32. self.lock = None
  33. self.txn_active = False
  34. self.repository = repository
  35. self.key = key
  36. self.manifest = manifest
  37. self.path = path or os.path.join(get_cache_dir(), hexlify(repository.id).decode('ascii'))
  38. self.do_files = do_files
  39. # Warn user before sending data to a never seen before unencrypted repository
  40. if not os.path.exists(self.path):
  41. if warn_if_unencrypted and isinstance(key, PlaintextKey):
  42. if not self._confirm('Warning: Attempting to access a previously unknown unencrypted repository',
  43. 'BORG_UNKNOWN_UNENCRYPTED_REPO_ACCESS_IS_OK'):
  44. raise self.CacheInitAbortedError()
  45. self.create()
  46. self.open()
  47. # Warn user before sending data to a relocated repository
  48. if self.previous_location and self.previous_location != repository._location.canonical_path():
  49. msg = 'Warning: The repository at location {} was previously located at {}'.format(repository._location.canonical_path(), self.previous_location)
  50. if not self._confirm(msg, 'BORG_RELOCATED_REPO_ACCESS_IS_OK'):
  51. raise self.RepositoryAccessAborted()
  52. if sync and self.manifest.id != self.manifest_id:
  53. # If repository is older than the cache something fishy is going on
  54. if self.timestamp and self.timestamp > manifest.timestamp:
  55. raise self.RepositoryReplay()
  56. # Make sure an encrypted repository has not been swapped for an unencrypted repository
  57. if self.key_type is not None and self.key_type != str(key.TYPE):
  58. raise self.EncryptionMethodMismatch()
  59. self.sync()
  60. self.commit()
  61. def __del__(self):
  62. self.close()
  63. def _confirm(self, message, env_var_override=None):
  64. print(message, file=sys.stderr)
  65. if env_var_override and os.environ.get(env_var_override):
  66. print("Yes (From {})".format(env_var_override))
  67. return True
  68. if not sys.stdin.isatty():
  69. return False
  70. try:
  71. answer = input('Do you want to continue? [yN] ')
  72. except EOFError:
  73. return False
  74. return answer and answer in 'Yy'
  75. def create(self):
  76. """Create a new empty cache at `self.path`
  77. """
  78. os.makedirs(self.path)
  79. with open(os.path.join(self.path, 'README'), 'w') as fd:
  80. fd.write('This is a Borg cache')
  81. config = configparser.RawConfigParser()
  82. config.add_section('cache')
  83. config.set('cache', 'version', '1')
  84. config.set('cache', 'repository', hexlify(self.repository.id).decode('ascii'))
  85. config.set('cache', 'manifest', '')
  86. with open(os.path.join(self.path, 'config'), 'w') as fd:
  87. config.write(fd)
  88. ChunkIndex().write(os.path.join(self.path, 'chunks').encode('utf-8'))
  89. os.makedirs(os.path.join(self.path, 'chunks.archive.d'))
  90. with open(os.path.join(self.path, 'files'), 'wb') as fd:
  91. pass # empty file
  92. def destroy(self):
  93. """destroy the cache at `self.path`
  94. """
  95. self.close()
  96. os.remove(os.path.join(self.path, 'config')) # kill config first
  97. shutil.rmtree(self.path)
  98. def _do_open(self):
  99. self.config = configparser.RawConfigParser()
  100. config_path = os.path.join(self.path, 'config')
  101. self.config.read(config_path)
  102. try:
  103. cache_version = self.config.getint('cache', 'version')
  104. wanted_version = 1
  105. if cache_version != wanted_version:
  106. raise Exception('%s has unexpected cache version %d (wanted: %d).' % (
  107. config_path, cache_version, wanted_version))
  108. except configparser.NoSectionError as e:
  109. raise Exception('%s does not look like a Borg cache.' % config_path)
  110. self.id = self.config.get('cache', 'repository')
  111. self.manifest_id = unhexlify(self.config.get('cache', 'manifest'))
  112. self.timestamp = self.config.get('cache', 'timestamp', fallback=None)
  113. self.key_type = self.config.get('cache', 'key_type', fallback=None)
  114. self.previous_location = self.config.get('cache', 'previous_location', fallback=None)
  115. self.chunks = ChunkIndex.read(os.path.join(self.path, 'chunks').encode('utf-8'))
  116. self.files = None
  117. def open(self):
  118. if not os.path.isdir(self.path):
  119. raise Exception('%s Does not look like a Borg cache' % self.path)
  120. self.lock = UpgradableLock(os.path.join(self.path, 'lock'), exclusive=True).acquire()
  121. self.rollback()
  122. def close(self):
  123. if self.lock:
  124. self.lock.release()
  125. self.lock = None
  126. def _read_files(self):
  127. self.files = {}
  128. self._newest_mtime = 0
  129. with open(os.path.join(self.path, 'files'), 'rb') as fd:
  130. u = msgpack.Unpacker(use_list=True)
  131. while True:
  132. data = fd.read(64 * 1024)
  133. if not data:
  134. break
  135. u.feed(data)
  136. for path_hash, item in u:
  137. item[0] += 1
  138. # in the end, this takes about 240 Bytes per file
  139. self.files[path_hash] = msgpack.packb(item)
  140. def begin_txn(self):
  141. # Initialize transaction snapshot
  142. txn_dir = os.path.join(self.path, 'txn.tmp')
  143. os.mkdir(txn_dir)
  144. shutil.copy(os.path.join(self.path, 'config'), txn_dir)
  145. shutil.copy(os.path.join(self.path, 'chunks'), txn_dir)
  146. shutil.copy(os.path.join(self.path, 'files'), txn_dir)
  147. os.rename(os.path.join(self.path, 'txn.tmp'),
  148. os.path.join(self.path, 'txn.active'))
  149. self.txn_active = True
  150. def commit(self):
  151. """Commit transaction
  152. """
  153. if not self.txn_active:
  154. return
  155. if self.files is not None:
  156. with open(os.path.join(self.path, 'files'), 'wb') as fd:
  157. for path_hash, item in self.files.items():
  158. # Discard cached files with the newest mtime to avoid
  159. # issues with filesystem snapshots and mtime precision
  160. item = msgpack.unpackb(item)
  161. if item[0] < 10 and bigint_to_int(item[3]) < self._newest_mtime:
  162. msgpack.pack((path_hash, item), fd)
  163. self.config.set('cache', 'manifest', hexlify(self.manifest.id).decode('ascii'))
  164. self.config.set('cache', 'timestamp', self.manifest.timestamp)
  165. self.config.set('cache', 'key_type', str(self.key.TYPE))
  166. self.config.set('cache', 'previous_location', self.repository._location.canonical_path())
  167. with open(os.path.join(self.path, 'config'), 'w') as fd:
  168. self.config.write(fd)
  169. self.chunks.write(os.path.join(self.path, 'chunks').encode('utf-8'))
  170. os.rename(os.path.join(self.path, 'txn.active'),
  171. os.path.join(self.path, 'txn.tmp'))
  172. shutil.rmtree(os.path.join(self.path, 'txn.tmp'))
  173. self.txn_active = False
  174. def rollback(self):
  175. """Roll back partial and aborted transactions
  176. """
  177. # Remove partial transaction
  178. if os.path.exists(os.path.join(self.path, 'txn.tmp')):
  179. shutil.rmtree(os.path.join(self.path, 'txn.tmp'))
  180. # Roll back active transaction
  181. txn_dir = os.path.join(self.path, 'txn.active')
  182. if os.path.exists(txn_dir):
  183. shutil.copy(os.path.join(txn_dir, 'config'), self.path)
  184. shutil.copy(os.path.join(txn_dir, 'chunks'), self.path)
  185. shutil.copy(os.path.join(txn_dir, 'files'), self.path)
  186. os.rename(txn_dir, os.path.join(self.path, 'txn.tmp'))
  187. if os.path.exists(os.path.join(self.path, 'txn.tmp')):
  188. shutil.rmtree(os.path.join(self.path, 'txn.tmp'))
  189. self.txn_active = False
  190. self._do_open()
  191. def sync(self):
  192. """Re-synchronize chunks cache with repository.
  193. Maintains a directory with known backup archive indexes, so it only
  194. needs to fetch infos from repo and build a chunk index once per backup
  195. archive.
  196. If out of sync, missing archive indexes get added, outdated indexes
  197. get removed and a new master chunks index is built by merging all
  198. archive indexes.
  199. """
  200. archive_path = os.path.join(self.path, 'chunks.archive.d')
  201. def mkpath(id, suffix=''):
  202. id_hex = hexlify(id).decode('ascii')
  203. path = os.path.join(archive_path, id_hex + suffix)
  204. return path.encode('utf-8')
  205. def cached_archives():
  206. if self.do_cache:
  207. fns = os.listdir(archive_path)
  208. # filenames with 64 hex digits == 256bit
  209. return set(unhexlify(fn) for fn in fns if len(fn) == 64)
  210. else:
  211. return set()
  212. def repo_archives():
  213. return set(info[b'id'] for info in self.manifest.archives.values())
  214. def cleanup_outdated(ids):
  215. for id in ids:
  216. os.unlink(mkpath(id))
  217. def add(chunk_idx, id, size, csize, incr=1):
  218. try:
  219. count, size, csize = chunk_idx[id]
  220. chunk_idx[id] = count + incr, size, csize
  221. except KeyError:
  222. chunk_idx[id] = incr, size, csize
  223. def fetch_and_build_idx(archive_id, repository, key):
  224. chunk_idx = ChunkIndex()
  225. cdata = repository.get(archive_id)
  226. data = key.decrypt(archive_id, cdata)
  227. add(chunk_idx, archive_id, len(data), len(cdata))
  228. archive = msgpack.unpackb(data)
  229. if archive[b'version'] != 1:
  230. raise Exception('Unknown archive metadata version')
  231. decode_dict(archive, (b'name',))
  232. unpacker = msgpack.Unpacker()
  233. for item_id, chunk in zip(archive[b'items'], repository.get_many(archive[b'items'])):
  234. data = key.decrypt(item_id, chunk)
  235. add(chunk_idx, item_id, len(data), len(chunk))
  236. unpacker.feed(data)
  237. for item in unpacker:
  238. if not isinstance(item, dict):
  239. print('Error: Did not get expected metadata dict - archive corrupted!')
  240. continue
  241. if b'chunks' in item:
  242. for chunk_id, size, csize in item[b'chunks']:
  243. add(chunk_idx, chunk_id, size, csize)
  244. if self.do_cache:
  245. fn = mkpath(archive_id)
  246. fn_tmp = mkpath(archive_id, suffix='.tmp')
  247. try:
  248. chunk_idx.write(fn_tmp)
  249. except Exception:
  250. os.unlink(fn_tmp)
  251. else:
  252. os.rename(fn_tmp, fn)
  253. return chunk_idx
  254. def lookup_name(archive_id):
  255. for name, info in self.manifest.archives.items():
  256. if info[b'id'] == archive_id:
  257. return name
  258. def create_master_idx(chunk_idx):
  259. print('Synchronizing chunks cache...')
  260. cached_ids = cached_archives()
  261. archive_ids = repo_archives()
  262. print('Archives: %d, w/ cached Idx: %d, w/ outdated Idx: %d, w/o cached Idx: %d.' % (
  263. len(archive_ids), len(cached_ids),
  264. len(cached_ids - archive_ids), len(archive_ids - cached_ids), ))
  265. # deallocates old hashindex, creates empty hashindex:
  266. chunk_idx.clear()
  267. cleanup_outdated(cached_ids - archive_ids)
  268. if archive_ids:
  269. chunk_idx = None
  270. for archive_id in archive_ids:
  271. archive_name = lookup_name(archive_id)
  272. if archive_id in cached_ids:
  273. archive_chunk_idx_path = mkpath(archive_id)
  274. print("Reading cached archive chunk index for %s ..." % archive_name)
  275. archive_chunk_idx = ChunkIndex.read(archive_chunk_idx_path)
  276. else:
  277. print('Fetching and building archive index for %s ...' % archive_name)
  278. archive_chunk_idx = fetch_and_build_idx(archive_id, repository, self.key)
  279. print("Merging into master chunks index ...")
  280. if chunk_idx is None:
  281. # we just use the first archive's idx as starting point,
  282. # to avoid growing the hash table from 0 size and also
  283. # to save 1 merge call.
  284. chunk_idx = archive_chunk_idx
  285. else:
  286. chunk_idx.merge(archive_chunk_idx)
  287. print('Done.')
  288. return chunk_idx
  289. def legacy_cleanup():
  290. """bring old cache dirs into the desired state (cleanup and adapt)"""
  291. try:
  292. os.unlink(os.path.join(self.path, 'chunks.archive'))
  293. except:
  294. pass
  295. try:
  296. os.unlink(os.path.join(self.path, 'chunks.archive.tmp'))
  297. except:
  298. pass
  299. try:
  300. os.mkdir(archive_path)
  301. except:
  302. pass
  303. self.begin_txn()
  304. repository = cache_if_remote(self.repository)
  305. legacy_cleanup()
  306. # TEMPORARY HACK: to avoid archive index caching, create a FILE named ~/.cache/borg/REPOID/chunks.archive.d -
  307. # this is only recommended if you have a fast, low latency connection to your repo (e.g. if repo is local disk)
  308. self.do_cache = os.path.isdir(archive_path)
  309. self.chunks = create_master_idx(self.chunks)
  310. def add_chunk(self, id, data, stats):
  311. if not self.txn_active:
  312. self.begin_txn()
  313. size = len(data)
  314. if self.seen_chunk(id, size):
  315. return self.chunk_incref(id, stats)
  316. data = self.key.encrypt(data)
  317. csize = len(data)
  318. self.repository.put(id, data, wait=False)
  319. self.chunks[id] = (1, size, csize)
  320. stats.update(size, csize, True)
  321. return id, size, csize
  322. def seen_chunk(self, id, size=None):
  323. refcount, stored_size, _ = self.chunks.get(id, (0, None, None))
  324. if size is not None and stored_size is not None and size != stored_size:
  325. # we already have a chunk with that id, but different size.
  326. # this is either a hash collision (unlikely) or corruption or a bug.
  327. raise Exception("chunk has same id [%r], but different size (stored: %d new: %d)!" % (
  328. id, stored_size, size))
  329. return refcount
  330. def chunk_incref(self, id, stats):
  331. if not self.txn_active:
  332. self.begin_txn()
  333. count, size, csize = self.chunks[id]
  334. self.chunks[id] = (count + 1, size, csize)
  335. stats.update(size, csize, False)
  336. return id, size, csize
  337. def chunk_decref(self, id, stats):
  338. if not self.txn_active:
  339. self.begin_txn()
  340. count, size, csize = self.chunks[id]
  341. if count == 1:
  342. del self.chunks[id]
  343. self.repository.delete(id, wait=False)
  344. stats.update(-size, -csize, True)
  345. else:
  346. self.chunks[id] = (count - 1, size, csize)
  347. stats.update(-size, -csize, False)
  348. def file_known_and_unchanged(self, path_hash, st):
  349. if not (self.do_files and stat.S_ISREG(st.st_mode)):
  350. return None
  351. if self.files is None:
  352. self._read_files()
  353. entry = self.files.get(path_hash)
  354. if not entry:
  355. return None
  356. entry = msgpack.unpackb(entry)
  357. if entry[2] == st.st_size and bigint_to_int(entry[3]) == st_mtime_ns(st) and entry[1] == st.st_ino:
  358. # reset entry age
  359. entry[0] = 0
  360. self.files[path_hash] = msgpack.packb(entry)
  361. return entry[4]
  362. else:
  363. return None
  364. def memorize_file(self, path_hash, st, ids):
  365. if not (self.do_files and stat.S_ISREG(st.st_mode)):
  366. return
  367. # Entry: Age, inode, size, mtime, chunk ids
  368. mtime_ns = st_mtime_ns(st)
  369. self.files[path_hash] = msgpack.packb((0, st.st_ino, st.st_size, int_to_bigint(mtime_ns), ids))
  370. self._newest_mtime = max(self._newest_mtime, mtime_ns)