cache.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428
  1. import configparser
  2. from .remote import cache_if_remote
  3. from collections import namedtuple
  4. import errno
  5. import os
  6. import stat
  7. import sys
  8. from binascii import hexlify
  9. import shutil
  10. import tarfile
  11. import tempfile
  12. from .key import PlaintextKey
  13. from .logger import create_logger
  14. logger = create_logger()
  15. from .helpers import Error, get_cache_dir, decode_dict, st_mtime_ns, unhexlify, int_to_bigint, \
  16. bigint_to_int, format_file_size, have_cython
  17. from .locking import UpgradableLock
  18. from .hashindex import ChunkIndex
  19. if have_cython():
  20. import msgpack
  21. class Cache:
  22. """Client Side cache
  23. """
  24. class RepositoryReplay(Error):
  25. """Cache is newer than repository, refusing to continue"""
  26. class CacheInitAbortedError(Error):
  27. """Cache initialization aborted"""
  28. class RepositoryAccessAborted(Error):
  29. """Repository access aborted"""
  30. class EncryptionMethodMismatch(Error):
  31. """Repository encryption method changed since last acccess, refusing to continue
  32. """
  33. def __init__(self, repository, key, manifest, path=None, sync=True, do_files=False, warn_if_unencrypted=True):
  34. self.lock = None
  35. self.timestamp = None
  36. self.lock = None
  37. self.txn_active = False
  38. self.repository = repository
  39. self.key = key
  40. self.manifest = manifest
  41. self.path = path or os.path.join(get_cache_dir(), hexlify(repository.id).decode('ascii'))
  42. self.do_files = do_files
  43. # Warn user before sending data to a never seen before unencrypted repository
  44. if not os.path.exists(self.path):
  45. if warn_if_unencrypted and isinstance(key, PlaintextKey):
  46. if not self._confirm('Warning: Attempting to access a previously unknown unencrypted repository',
  47. 'BORG_UNKNOWN_UNENCRYPTED_REPO_ACCESS_IS_OK'):
  48. raise self.CacheInitAbortedError()
  49. self.create()
  50. self.open()
  51. # Warn user before sending data to a relocated repository
  52. if self.previous_location and self.previous_location != repository._location.canonical_path():
  53. msg = 'Warning: The repository at location {} was previously located at {}'.format(repository._location.canonical_path(), self.previous_location)
  54. if not self._confirm(msg, 'BORG_RELOCATED_REPO_ACCESS_IS_OK'):
  55. raise self.RepositoryAccessAborted()
  56. if sync and self.manifest.id != self.manifest_id:
  57. # If repository is older than the cache something fishy is going on
  58. if self.timestamp and self.timestamp > manifest.timestamp:
  59. raise self.RepositoryReplay()
  60. # Make sure an encrypted repository has not been swapped for an unencrypted repository
  61. if self.key_type is not None and self.key_type != str(key.TYPE):
  62. raise self.EncryptionMethodMismatch()
  63. self.sync()
  64. self.commit()
  65. def __del__(self):
  66. self.close()
  67. def __str__(self):
  68. return format(self, """\
  69. All archives: {0.total_size:>20s} {0.total_csize:>20s} {0.unique_csize:>20s}
  70. Unique chunks Total chunks
  71. Chunk index: {0.total_unique_chunks:20d} {0.total_chunks:20d}""")
  72. def __format__(self, format_spec):
  73. # XXX: this should really be moved down to `hashindex.pyx`
  74. Summary = namedtuple('Summary', ['total_size', 'total_csize', 'unique_size', 'unique_csize', 'total_unique_chunks', 'total_chunks'])
  75. stats = Summary(*self.chunks.summarize())._asdict()
  76. for field in ['total_size', 'total_csize', 'unique_csize']:
  77. stats[field] = format_file_size(stats[field])
  78. stats = Summary(**stats)
  79. return format_spec.format(stats)
  80. def _confirm(self, message, env_var_override=None):
  81. print(message, file=sys.stderr)
  82. if env_var_override and os.environ.get(env_var_override):
  83. print("Yes (From {})".format(env_var_override), file=sys.stderr)
  84. return True
  85. if not sys.stdin.isatty():
  86. return False
  87. try:
  88. answer = input('Do you want to continue? [yN] ')
  89. except EOFError:
  90. return False
  91. return answer and answer in 'Yy'
  92. def create(self):
  93. """Create a new empty cache at `self.path`
  94. """
  95. os.makedirs(self.path)
  96. with open(os.path.join(self.path, 'README'), 'w') as fd:
  97. fd.write('This is a Borg cache')
  98. config = configparser.RawConfigParser()
  99. config.add_section('cache')
  100. config.set('cache', 'version', '1')
  101. config.set('cache', 'repository', hexlify(self.repository.id).decode('ascii'))
  102. config.set('cache', 'manifest', '')
  103. with open(os.path.join(self.path, 'config'), 'w') as fd:
  104. config.write(fd)
  105. ChunkIndex().write(os.path.join(self.path, 'chunks').encode('utf-8'))
  106. os.makedirs(os.path.join(self.path, 'chunks.archive.d'))
  107. with open(os.path.join(self.path, 'files'), 'wb') as fd:
  108. pass # empty file
  109. def destroy(self):
  110. """destroy the cache at `self.path`
  111. """
  112. self.close()
  113. os.remove(os.path.join(self.path, 'config')) # kill config first
  114. shutil.rmtree(self.path)
  115. def _do_open(self):
  116. self.config = configparser.RawConfigParser()
  117. config_path = os.path.join(self.path, 'config')
  118. self.config.read(config_path)
  119. try:
  120. cache_version = self.config.getint('cache', 'version')
  121. wanted_version = 1
  122. if cache_version != wanted_version:
  123. raise Exception('%s has unexpected cache version %d (wanted: %d).' % (
  124. config_path, cache_version, wanted_version))
  125. except configparser.NoSectionError as e:
  126. raise Exception('%s does not look like a Borg cache.' % config_path)
  127. self.id = self.config.get('cache', 'repository')
  128. self.manifest_id = unhexlify(self.config.get('cache', 'manifest'))
  129. self.timestamp = self.config.get('cache', 'timestamp', fallback=None)
  130. self.key_type = self.config.get('cache', 'key_type', fallback=None)
  131. self.previous_location = self.config.get('cache', 'previous_location', fallback=None)
  132. self.chunks = ChunkIndex.read(os.path.join(self.path, 'chunks').encode('utf-8'))
  133. self.files = None
  134. def open(self):
  135. if not os.path.isdir(self.path):
  136. raise Exception('%s Does not look like a Borg cache' % self.path)
  137. self.lock = UpgradableLock(os.path.join(self.path, 'lock'), exclusive=True).acquire()
  138. self.rollback()
  139. def close(self):
  140. if self.lock:
  141. self.lock.release()
  142. self.lock = None
  143. def _read_files(self):
  144. self.files = {}
  145. self._newest_mtime = 0
  146. with open(os.path.join(self.path, 'files'), 'rb') as fd:
  147. u = msgpack.Unpacker(use_list=True)
  148. while True:
  149. data = fd.read(64 * 1024)
  150. if not data:
  151. break
  152. u.feed(data)
  153. for path_hash, item in u:
  154. item[0] += 1
  155. # in the end, this takes about 240 Bytes per file
  156. self.files[path_hash] = msgpack.packb(item)
  157. def begin_txn(self):
  158. # Initialize transaction snapshot
  159. txn_dir = os.path.join(self.path, 'txn.tmp')
  160. os.mkdir(txn_dir)
  161. shutil.copy(os.path.join(self.path, 'config'), txn_dir)
  162. shutil.copy(os.path.join(self.path, 'chunks'), txn_dir)
  163. shutil.copy(os.path.join(self.path, 'files'), txn_dir)
  164. os.rename(os.path.join(self.path, 'txn.tmp'),
  165. os.path.join(self.path, 'txn.active'))
  166. self.txn_active = True
  167. def commit(self):
  168. """Commit transaction
  169. """
  170. if not self.txn_active:
  171. return
  172. if self.files is not None:
  173. with open(os.path.join(self.path, 'files'), 'wb') as fd:
  174. for path_hash, item in self.files.items():
  175. # Discard cached files with the newest mtime to avoid
  176. # issues with filesystem snapshots and mtime precision
  177. item = msgpack.unpackb(item)
  178. if item[0] < 10 and bigint_to_int(item[3]) < self._newest_mtime:
  179. msgpack.pack((path_hash, item), fd)
  180. self.config.set('cache', 'manifest', hexlify(self.manifest.id).decode('ascii'))
  181. self.config.set('cache', 'timestamp', self.manifest.timestamp)
  182. self.config.set('cache', 'key_type', str(self.key.TYPE))
  183. self.config.set('cache', 'previous_location', self.repository._location.canonical_path())
  184. with open(os.path.join(self.path, 'config'), 'w') as fd:
  185. self.config.write(fd)
  186. self.chunks.write(os.path.join(self.path, 'chunks').encode('utf-8'))
  187. os.rename(os.path.join(self.path, 'txn.active'),
  188. os.path.join(self.path, 'txn.tmp'))
  189. shutil.rmtree(os.path.join(self.path, 'txn.tmp'))
  190. self.txn_active = False
  191. def rollback(self):
  192. """Roll back partial and aborted transactions
  193. """
  194. # Remove partial transaction
  195. if os.path.exists(os.path.join(self.path, 'txn.tmp')):
  196. shutil.rmtree(os.path.join(self.path, 'txn.tmp'))
  197. # Roll back active transaction
  198. txn_dir = os.path.join(self.path, 'txn.active')
  199. if os.path.exists(txn_dir):
  200. shutil.copy(os.path.join(txn_dir, 'config'), self.path)
  201. shutil.copy(os.path.join(txn_dir, 'chunks'), self.path)
  202. shutil.copy(os.path.join(txn_dir, 'files'), self.path)
  203. os.rename(txn_dir, os.path.join(self.path, 'txn.tmp'))
  204. if os.path.exists(os.path.join(self.path, 'txn.tmp')):
  205. shutil.rmtree(os.path.join(self.path, 'txn.tmp'))
  206. self.txn_active = False
  207. self._do_open()
  208. def sync(self):
  209. """Re-synchronize chunks cache with repository.
  210. Maintains a directory with known backup archive indexes, so it only
  211. needs to fetch infos from repo and build a chunk index once per backup
  212. archive.
  213. If out of sync, missing archive indexes get added, outdated indexes
  214. get removed and a new master chunks index is built by merging all
  215. archive indexes.
  216. """
  217. archive_path = os.path.join(self.path, 'chunks.archive.d')
  218. def mkpath(id, suffix=''):
  219. id_hex = hexlify(id).decode('ascii')
  220. path = os.path.join(archive_path, id_hex + suffix)
  221. return path.encode('utf-8')
  222. def cached_archives():
  223. if self.do_cache:
  224. fns = os.listdir(archive_path)
  225. # filenames with 64 hex digits == 256bit
  226. return set(unhexlify(fn) for fn in fns if len(fn) == 64)
  227. else:
  228. return set()
  229. def repo_archives():
  230. return set(info[b'id'] for info in self.manifest.archives.values())
  231. def cleanup_outdated(ids):
  232. for id in ids:
  233. os.unlink(mkpath(id))
  234. def add(chunk_idx, id, size, csize, incr=1):
  235. try:
  236. count, size, csize = chunk_idx[id]
  237. chunk_idx[id] = count + incr, size, csize
  238. except KeyError:
  239. chunk_idx[id] = incr, size, csize
  240. def fetch_and_build_idx(archive_id, repository, key):
  241. chunk_idx = ChunkIndex()
  242. cdata = repository.get(archive_id)
  243. data = key.decrypt(archive_id, cdata)
  244. add(chunk_idx, archive_id, len(data), len(cdata))
  245. archive = msgpack.unpackb(data)
  246. if archive[b'version'] != 1:
  247. raise Exception('Unknown archive metadata version')
  248. decode_dict(archive, (b'name',))
  249. unpacker = msgpack.Unpacker()
  250. for item_id, chunk in zip(archive[b'items'], repository.get_many(archive[b'items'])):
  251. data = key.decrypt(item_id, chunk)
  252. add(chunk_idx, item_id, len(data), len(chunk))
  253. unpacker.feed(data)
  254. for item in unpacker:
  255. if not isinstance(item, dict):
  256. logger.error('Error: Did not get expected metadata dict - archive corrupted!')
  257. continue
  258. if b'chunks' in item:
  259. for chunk_id, size, csize in item[b'chunks']:
  260. add(chunk_idx, chunk_id, size, csize)
  261. if self.do_cache:
  262. fn = mkpath(archive_id)
  263. fn_tmp = mkpath(archive_id, suffix='.tmp')
  264. try:
  265. chunk_idx.write(fn_tmp)
  266. except Exception:
  267. os.unlink(fn_tmp)
  268. else:
  269. os.rename(fn_tmp, fn)
  270. return chunk_idx
  271. def lookup_name(archive_id):
  272. for name, info in self.manifest.archives.items():
  273. if info[b'id'] == archive_id:
  274. return name
  275. def create_master_idx(chunk_idx):
  276. logger.info('Synchronizing chunks cache...')
  277. cached_ids = cached_archives()
  278. archive_ids = repo_archives()
  279. logger.info('Archives: %d, w/ cached Idx: %d, w/ outdated Idx: %d, w/o cached Idx: %d.' % (
  280. len(archive_ids), len(cached_ids),
  281. len(cached_ids - archive_ids), len(archive_ids - cached_ids), ))
  282. # deallocates old hashindex, creates empty hashindex:
  283. chunk_idx.clear()
  284. cleanup_outdated(cached_ids - archive_ids)
  285. if archive_ids:
  286. chunk_idx = None
  287. for archive_id in archive_ids:
  288. archive_name = lookup_name(archive_id)
  289. if archive_id in cached_ids:
  290. archive_chunk_idx_path = mkpath(archive_id)
  291. logger.info("Reading cached archive chunk index for %s ..." % archive_name)
  292. archive_chunk_idx = ChunkIndex.read(archive_chunk_idx_path)
  293. else:
  294. logger.info('Fetching and building archive index for %s ...' % archive_name)
  295. archive_chunk_idx = fetch_and_build_idx(archive_id, repository, self.key)
  296. logger.info("Merging into master chunks index ...")
  297. if chunk_idx is None:
  298. # we just use the first archive's idx as starting point,
  299. # to avoid growing the hash table from 0 size and also
  300. # to save 1 merge call.
  301. chunk_idx = archive_chunk_idx
  302. else:
  303. chunk_idx.merge(archive_chunk_idx)
  304. logger.info('Done.')
  305. return chunk_idx
  306. def legacy_cleanup():
  307. """bring old cache dirs into the desired state (cleanup and adapt)"""
  308. try:
  309. os.unlink(os.path.join(self.path, 'chunks.archive'))
  310. except:
  311. pass
  312. try:
  313. os.unlink(os.path.join(self.path, 'chunks.archive.tmp'))
  314. except:
  315. pass
  316. try:
  317. os.mkdir(archive_path)
  318. except:
  319. pass
  320. self.begin_txn()
  321. repository = cache_if_remote(self.repository)
  322. legacy_cleanup()
  323. # TEMPORARY HACK: to avoid archive index caching, create a FILE named ~/.cache/borg/REPOID/chunks.archive.d -
  324. # this is only recommended if you have a fast, low latency connection to your repo (e.g. if repo is local disk)
  325. self.do_cache = os.path.isdir(archive_path)
  326. self.chunks = create_master_idx(self.chunks)
  327. def add_chunk(self, id, data, stats):
  328. if not self.txn_active:
  329. self.begin_txn()
  330. size = len(data)
  331. if self.seen_chunk(id, size):
  332. return self.chunk_incref(id, stats)
  333. data = self.key.encrypt(data)
  334. csize = len(data)
  335. self.repository.put(id, data, wait=False)
  336. self.chunks[id] = (1, size, csize)
  337. stats.update(size, csize, True)
  338. return id, size, csize
  339. def seen_chunk(self, id, size=None):
  340. refcount, stored_size, _ = self.chunks.get(id, (0, None, None))
  341. if size is not None and stored_size is not None and size != stored_size:
  342. # we already have a chunk with that id, but different size.
  343. # this is either a hash collision (unlikely) or corruption or a bug.
  344. raise Exception("chunk has same id [%r], but different size (stored: %d new: %d)!" % (
  345. id, stored_size, size))
  346. return refcount
  347. def chunk_incref(self, id, stats):
  348. if not self.txn_active:
  349. self.begin_txn()
  350. count, size, csize = self.chunks[id]
  351. self.chunks[id] = (count + 1, size, csize)
  352. stats.update(size, csize, False)
  353. return id, size, csize
  354. def chunk_decref(self, id, stats):
  355. if not self.txn_active:
  356. self.begin_txn()
  357. count, size, csize = self.chunks[id]
  358. if count == 1:
  359. del self.chunks[id]
  360. self.repository.delete(id, wait=False)
  361. stats.update(-size, -csize, True)
  362. else:
  363. self.chunks[id] = (count - 1, size, csize)
  364. stats.update(-size, -csize, False)
  365. def file_known_and_unchanged(self, path_hash, st):
  366. if not (self.do_files and stat.S_ISREG(st.st_mode)):
  367. return None
  368. if self.files is None:
  369. self._read_files()
  370. entry = self.files.get(path_hash)
  371. if not entry:
  372. return None
  373. entry = msgpack.unpackb(entry)
  374. if entry[2] == st.st_size and bigint_to_int(entry[3]) == st_mtime_ns(st) and entry[1] == st.st_ino:
  375. # reset entry age
  376. entry[0] = 0
  377. self.files[path_hash] = msgpack.packb(entry)
  378. return entry[4]
  379. else:
  380. return None
  381. def memorize_file(self, path_hash, st, ids):
  382. if not (self.do_files and stat.S_ISREG(st.st_mode)):
  383. return
  384. # Entry: Age, inode, size, mtime, chunk ids
  385. mtime_ns = st_mtime_ns(st)
  386. self.files[path_hash] = msgpack.packb((0, st.st_ino, st.st_size, int_to_bigint(mtime_ns), ids))
  387. self._newest_mtime = max(self._newest_mtime, mtime_ns)