cache.py 54 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217
  1. import configparser
  2. import os
  3. import shutil
  4. import stat
  5. from binascii import unhexlify
  6. from collections import namedtuple
  7. from time import perf_counter
  8. from .logger import create_logger
  9. logger = create_logger()
  10. files_cache_logger = create_logger("borg.debug.files_cache")
  11. from .constants import CACHE_README, FILES_CACHE_MODE_DISABLED
  12. from .hashindex import ChunkIndex, ChunkIndexEntry, CacheSynchronizer
  13. from .helpers import Location
  14. from .helpers import Error
  15. from .helpers import get_cache_dir, get_security_dir
  16. from .helpers import bin_to_hex, parse_stringified_list
  17. from .helpers import format_file_size
  18. from .helpers import safe_ns
  19. from .helpers import yes
  20. from .helpers import remove_surrogates
  21. from .helpers import ProgressIndicatorPercent, ProgressIndicatorMessage
  22. from .helpers import set_ec, EXIT_WARNING
  23. from .helpers import safe_unlink
  24. from .helpers import msgpack
  25. from .helpers.msgpack import int_to_timestamp, timestamp_to_int
  26. from .item import ArchiveItem, ChunkListEntry
  27. from .crypto.key import PlaintextKey
  28. from .crypto.file_integrity import IntegrityCheckedFile, DetachedIntegrityCheckedFile, FileIntegrityError
  29. from .locking import Lock
  30. from .manifest import Manifest
  31. from .platform import SaveFile
  32. from .remote import cache_if_remote
  33. from .repository import LIST_SCAN_LIMIT
  34. # note: cmtime might me either a ctime or a mtime timestamp
  35. FileCacheEntry = namedtuple("FileCacheEntry", "age inode size cmtime chunk_ids")
  36. class SecurityManager:
  37. """
  38. Tracks repositories. Ensures that nothing bad happens (repository swaps,
  39. replay attacks, unknown repositories etc.).
  40. This is complicated by the Cache being initially used for this, while
  41. only some commands actually use the Cache, which meant that other commands
  42. did not perform these checks.
  43. Further complications were created by the Cache being a cache, so it
  44. could be legitimately deleted, which is annoying because Borg didn't
  45. recognize repositories after that.
  46. Therefore a second location, the security database (see get_security_dir),
  47. was introduced which stores this information. However, this means that
  48. the code has to deal with a cache existing but no security DB entry,
  49. or inconsistencies between the security DB and the cache which have to
  50. be reconciled, and also with no cache existing but a security DB entry.
  51. """
  52. def __init__(self, repository):
  53. self.repository = repository
  54. self.dir = get_security_dir(repository.id_str, legacy=(repository.version == 1))
  55. self.cache_dir = cache_dir(repository)
  56. self.key_type_file = os.path.join(self.dir, "key-type")
  57. self.location_file = os.path.join(self.dir, "location")
  58. self.manifest_ts_file = os.path.join(self.dir, "manifest-timestamp")
  59. @staticmethod
  60. def destroy(repository, path=None):
  61. """destroy the security dir for ``repository`` or at ``path``"""
  62. path = path or get_security_dir(repository.id_str, legacy=(repository.version == 1))
  63. if os.path.exists(path):
  64. shutil.rmtree(path)
  65. def known(self):
  66. return all(os.path.exists(f) for f in (self.key_type_file, self.location_file, self.manifest_ts_file))
  67. def key_matches(self, key):
  68. if not self.known():
  69. return False
  70. try:
  71. with open(self.key_type_file) as fd:
  72. type = fd.read()
  73. return type == str(key.TYPE)
  74. except OSError as exc:
  75. logger.warning("Could not read/parse key type file: %s", exc)
  76. def save(self, manifest, key):
  77. logger.debug("security: saving state for %s to %s", self.repository.id_str, self.dir)
  78. current_location = self.repository._location.canonical_path()
  79. logger.debug("security: current location %s", current_location)
  80. logger.debug("security: key type %s", str(key.TYPE))
  81. logger.debug("security: manifest timestamp %s", manifest.timestamp)
  82. with SaveFile(self.location_file) as fd:
  83. fd.write(current_location)
  84. with SaveFile(self.key_type_file) as fd:
  85. fd.write(str(key.TYPE))
  86. with SaveFile(self.manifest_ts_file) as fd:
  87. fd.write(manifest.timestamp)
  88. def assert_location_matches(self, cache_config=None):
  89. # Warn user before sending data to a relocated repository
  90. try:
  91. with open(self.location_file) as fd:
  92. previous_location = fd.read()
  93. logger.debug("security: read previous location %r", previous_location)
  94. except FileNotFoundError:
  95. logger.debug("security: previous location file %s not found", self.location_file)
  96. previous_location = None
  97. except OSError as exc:
  98. logger.warning("Could not read previous location file: %s", exc)
  99. previous_location = None
  100. if cache_config and cache_config.previous_location and previous_location != cache_config.previous_location:
  101. # Reconcile cache and security dir; we take the cache location.
  102. previous_location = cache_config.previous_location
  103. logger.debug("security: using previous_location of cache: %r", previous_location)
  104. repository_location = self.repository._location.canonical_path()
  105. if previous_location and previous_location != repository_location:
  106. msg = (
  107. "Warning: The repository at location {} was previously located at {}\n".format(
  108. repository_location, previous_location
  109. )
  110. + "Do you want to continue? [yN] "
  111. )
  112. if not yes(
  113. msg,
  114. false_msg="Aborting.",
  115. invalid_msg="Invalid answer, aborting.",
  116. retry=False,
  117. env_var_override="BORG_RELOCATED_REPO_ACCESS_IS_OK",
  118. ):
  119. raise Cache.RepositoryAccessAborted()
  120. # adapt on-disk config immediately if the new location was accepted
  121. logger.debug("security: updating location stored in cache and security dir")
  122. with SaveFile(self.location_file) as fd:
  123. fd.write(repository_location)
  124. if cache_config:
  125. cache_config.save()
  126. def assert_no_manifest_replay(self, manifest, key, cache_config=None):
  127. try:
  128. with open(self.manifest_ts_file) as fd:
  129. timestamp = fd.read()
  130. logger.debug("security: read manifest timestamp %r", timestamp)
  131. except FileNotFoundError:
  132. logger.debug("security: manifest timestamp file %s not found", self.manifest_ts_file)
  133. timestamp = ""
  134. except OSError as exc:
  135. logger.warning("Could not read previous location file: %s", exc)
  136. timestamp = ""
  137. if cache_config:
  138. timestamp = max(timestamp, cache_config.timestamp or "")
  139. logger.debug("security: determined newest manifest timestamp as %s", timestamp)
  140. # If repository is older than the cache or security dir something fishy is going on
  141. if timestamp and timestamp > manifest.timestamp:
  142. if isinstance(key, PlaintextKey):
  143. raise Cache.RepositoryIDNotUnique()
  144. else:
  145. raise Cache.RepositoryReplay()
  146. def assert_key_type(self, key, cache_config=None):
  147. # Make sure an encrypted repository has not been swapped for an unencrypted repository
  148. if cache_config and cache_config.key_type is not None and cache_config.key_type != str(key.TYPE):
  149. raise Cache.EncryptionMethodMismatch()
  150. if self.known() and not self.key_matches(key):
  151. raise Cache.EncryptionMethodMismatch()
  152. def assert_secure(self, manifest, key, *, cache_config=None, warn_if_unencrypted=True, lock_wait=None):
  153. # warn_if_unencrypted=False is only used for initializing a new repository.
  154. # Thus, avoiding asking about a repository that's currently initializing.
  155. self.assert_access_unknown(warn_if_unencrypted, manifest, key)
  156. if cache_config:
  157. self._assert_secure(manifest, key, cache_config)
  158. else:
  159. cache_config = CacheConfig(self.repository, lock_wait=lock_wait)
  160. if cache_config.exists():
  161. with cache_config:
  162. self._assert_secure(manifest, key, cache_config)
  163. else:
  164. self._assert_secure(manifest, key)
  165. logger.debug("security: repository checks ok, allowing access")
  166. def _assert_secure(self, manifest, key, cache_config=None):
  167. self.assert_location_matches(cache_config)
  168. self.assert_key_type(key, cache_config)
  169. self.assert_no_manifest_replay(manifest, key, cache_config)
  170. if not self.known():
  171. logger.debug("security: remembering previously unknown repository")
  172. self.save(manifest, key)
  173. def assert_access_unknown(self, warn_if_unencrypted, manifest, key):
  174. # warn_if_unencrypted=False is only used for initializing a new repository.
  175. # Thus, avoiding asking about a repository that's currently initializing.
  176. if not key.logically_encrypted and not self.known():
  177. msg = (
  178. "Warning: Attempting to access a previously unknown unencrypted repository!\n"
  179. + "Do you want to continue? [yN] "
  180. )
  181. allow_access = not warn_if_unencrypted or yes(
  182. msg,
  183. false_msg="Aborting.",
  184. invalid_msg="Invalid answer, aborting.",
  185. retry=False,
  186. env_var_override="BORG_UNKNOWN_UNENCRYPTED_REPO_ACCESS_IS_OK",
  187. )
  188. if allow_access:
  189. if warn_if_unencrypted:
  190. logger.debug("security: remembering unknown unencrypted repository (explicitly allowed)")
  191. else:
  192. logger.debug("security: initializing unencrypted repository")
  193. self.save(manifest, key)
  194. else:
  195. raise Cache.CacheInitAbortedError()
  196. def assert_secure(repository, manifest, lock_wait):
  197. sm = SecurityManager(repository)
  198. sm.assert_secure(manifest, manifest.key, lock_wait=lock_wait)
  199. def recanonicalize_relative_location(cache_location, repository):
  200. # borg < 1.0.8rc1 had different canonicalization for the repo location (see #1655 and #1741).
  201. repo_location = repository._location.canonical_path()
  202. rl = Location(repo_location)
  203. cl = Location(cache_location)
  204. if (
  205. cl.proto == rl.proto
  206. and cl.user == rl.user
  207. and cl.host == rl.host
  208. and cl.port == rl.port
  209. and cl.path
  210. and rl.path
  211. and cl.path.startswith("/~/")
  212. and rl.path.startswith("/./")
  213. and cl.path[3:] == rl.path[3:]
  214. ):
  215. # everything is same except the expected change in relative path canonicalization,
  216. # update previous_location to avoid warning / user query about changed location:
  217. return repo_location
  218. else:
  219. return cache_location
  220. def cache_dir(repository, path=None):
  221. return path or os.path.join(get_cache_dir(), repository.id_str)
  222. def files_cache_name():
  223. suffix = os.environ.get("BORG_FILES_CACHE_SUFFIX", "")
  224. return "files." + suffix if suffix else "files"
  225. def discover_files_cache_name(path):
  226. return [fn for fn in os.listdir(path) if fn == "files" or fn.startswith("files.")][0]
  227. class CacheConfig:
  228. def __init__(self, repository, path=None, lock_wait=None):
  229. self.repository = repository
  230. self.path = cache_dir(repository, path)
  231. logger.debug("Using %s as cache", self.path)
  232. self.config_path = os.path.join(self.path, "config")
  233. self.lock = None
  234. self.lock_wait = lock_wait
  235. def __enter__(self):
  236. self.open()
  237. return self
  238. def __exit__(self, exc_type, exc_val, exc_tb):
  239. self.close()
  240. def exists(self):
  241. return os.path.exists(self.config_path)
  242. def create(self):
  243. assert not self.exists()
  244. config = configparser.ConfigParser(interpolation=None)
  245. config.add_section("cache")
  246. config.set("cache", "version", "1")
  247. config.set("cache", "repository", self.repository.id_str)
  248. config.set("cache", "manifest", "")
  249. config.add_section("integrity")
  250. config.set("integrity", "manifest", "")
  251. with SaveFile(self.config_path) as fd:
  252. config.write(fd)
  253. def open(self):
  254. self.lock = Lock(os.path.join(self.path, "lock"), exclusive=True, timeout=self.lock_wait).acquire()
  255. self.load()
  256. def load(self):
  257. self._config = configparser.ConfigParser(interpolation=None)
  258. with open(self.config_path) as fd:
  259. self._config.read_file(fd)
  260. self._check_upgrade(self.config_path)
  261. self.id = self._config.get("cache", "repository")
  262. self.manifest_id = unhexlify(self._config.get("cache", "manifest"))
  263. self.timestamp = self._config.get("cache", "timestamp", fallback=None)
  264. self.key_type = self._config.get("cache", "key_type", fallback=None)
  265. self.ignored_features = set(parse_stringified_list(self._config.get("cache", "ignored_features", fallback="")))
  266. self.mandatory_features = set(
  267. parse_stringified_list(self._config.get("cache", "mandatory_features", fallback=""))
  268. )
  269. try:
  270. self.integrity = dict(self._config.items("integrity"))
  271. if self._config.get("cache", "manifest") != self.integrity.pop("manifest"):
  272. # The cache config file is updated (parsed with ConfigParser, the state of the ConfigParser
  273. # is modified and then written out.), not re-created.
  274. # Thus, older versions will leave our [integrity] section alone, making the section's data invalid.
  275. # Therefore, we also add the manifest ID to this section and
  276. # can discern whether an older version interfered by comparing the manifest IDs of this section
  277. # and the main [cache] section.
  278. self.integrity = {}
  279. logger.warning("Cache integrity data not available: old Borg version modified the cache.")
  280. except configparser.NoSectionError:
  281. logger.debug("Cache integrity: No integrity data found (files, chunks). Cache is from old version.")
  282. self.integrity = {}
  283. previous_location = self._config.get("cache", "previous_location", fallback=None)
  284. if previous_location:
  285. self.previous_location = recanonicalize_relative_location(previous_location, self.repository)
  286. else:
  287. self.previous_location = None
  288. self._config.set("cache", "previous_location", self.repository._location.canonical_path())
  289. def save(self, manifest=None, key=None):
  290. if manifest:
  291. self._config.set("cache", "manifest", manifest.id_str)
  292. self._config.set("cache", "timestamp", manifest.timestamp)
  293. self._config.set("cache", "ignored_features", ",".join(self.ignored_features))
  294. self._config.set("cache", "mandatory_features", ",".join(self.mandatory_features))
  295. if not self._config.has_section("integrity"):
  296. self._config.add_section("integrity")
  297. for file, integrity_data in self.integrity.items():
  298. self._config.set("integrity", file, integrity_data)
  299. self._config.set("integrity", "manifest", manifest.id_str)
  300. if key:
  301. self._config.set("cache", "key_type", str(key.TYPE))
  302. with SaveFile(self.config_path) as fd:
  303. self._config.write(fd)
  304. def close(self):
  305. if self.lock is not None:
  306. self.lock.release()
  307. self.lock = None
  308. def _check_upgrade(self, config_path):
  309. try:
  310. cache_version = self._config.getint("cache", "version")
  311. wanted_version = 1
  312. if cache_version != wanted_version:
  313. self.close()
  314. raise Exception(
  315. "%s has unexpected cache version %d (wanted: %d)." % (config_path, cache_version, wanted_version)
  316. )
  317. except configparser.NoSectionError:
  318. self.close()
  319. raise Exception("%s does not look like a Borg cache." % config_path) from None
  320. class Cache:
  321. """Client Side cache"""
  322. class RepositoryIDNotUnique(Error):
  323. """Cache is newer than repository - do you have multiple, independently updated repos with same ID?"""
  324. class RepositoryReplay(Error):
  325. """Cache, or information obtained from the security directory is newer than repository - this is either an attack or unsafe (multiple repos with same ID)"""
  326. class CacheInitAbortedError(Error):
  327. """Cache initialization aborted"""
  328. class RepositoryAccessAborted(Error):
  329. """Repository access aborted"""
  330. class EncryptionMethodMismatch(Error):
  331. """Repository encryption method changed since last access, refusing to continue"""
  332. @staticmethod
  333. def break_lock(repository, path=None):
  334. path = cache_dir(repository, path)
  335. Lock(os.path.join(path, "lock"), exclusive=True).break_lock()
  336. @staticmethod
  337. def destroy(repository, path=None):
  338. """destroy the cache for ``repository`` or at ``path``"""
  339. path = path or os.path.join(get_cache_dir(), repository.id_str)
  340. config = os.path.join(path, "config")
  341. if os.path.exists(config):
  342. os.remove(config) # kill config first
  343. shutil.rmtree(path)
  344. def __new__(
  345. cls,
  346. repository,
  347. manifest,
  348. path=None,
  349. sync=True,
  350. warn_if_unencrypted=True,
  351. progress=False,
  352. lock_wait=None,
  353. permit_adhoc_cache=False,
  354. cache_mode=FILES_CACHE_MODE_DISABLED,
  355. iec=False,
  356. ):
  357. def local():
  358. return LocalCache(
  359. manifest=manifest,
  360. path=path,
  361. sync=sync,
  362. warn_if_unencrypted=warn_if_unencrypted,
  363. progress=progress,
  364. iec=iec,
  365. lock_wait=lock_wait,
  366. cache_mode=cache_mode,
  367. )
  368. def adhoc():
  369. return AdHocCache(manifest=manifest, lock_wait=lock_wait, iec=iec)
  370. if not permit_adhoc_cache:
  371. return local()
  372. # ad-hoc cache may be permitted, but if the local cache is in sync it'd be stupid to invalidate
  373. # it by needlessly using the ad-hoc cache.
  374. # Check if the local cache exists and is in sync.
  375. cache_config = CacheConfig(repository, path, lock_wait)
  376. if cache_config.exists():
  377. with cache_config:
  378. cache_in_sync = cache_config.manifest_id == manifest.id
  379. # Don't nest cache locks
  380. if cache_in_sync:
  381. # Local cache is in sync, use it
  382. logger.debug("Cache: choosing local cache (in sync)")
  383. return local()
  384. logger.debug("Cache: choosing ad-hoc cache (local cache does not exist or is not in sync)")
  385. return adhoc()
  386. class CacheStatsMixin:
  387. str_format = """\
  388. Original size: {0.total_size}
  389. Deduplicated size: {0.unique_size}
  390. Unique chunks: {0.total_unique_chunks}
  391. Total chunks: {0.total_chunks}
  392. """
  393. def __init__(self, iec=False):
  394. self.iec = iec
  395. def __str__(self):
  396. return self.str_format.format(self.format_tuple())
  397. Summary = namedtuple("Summary", ["total_size", "unique_size", "total_unique_chunks", "total_chunks"])
  398. def stats(self):
  399. from .archive import Archive
  400. # XXX: this should really be moved down to `hashindex.pyx`
  401. total_size, unique_size, total_unique_chunks, total_chunks = self.chunks.summarize()
  402. # since borg 1.2 we have new archive metadata telling the total size per archive,
  403. # so we can just sum up all archives to get the "all archives" stats:
  404. total_size = 0
  405. for archive_name in self.manifest.archives:
  406. archive = Archive(self.manifest, archive_name)
  407. stats = archive.calc_stats(self, want_unique=False)
  408. total_size += stats.osize
  409. stats = self.Summary(total_size, unique_size, total_unique_chunks, total_chunks)._asdict()
  410. return stats
  411. def format_tuple(self):
  412. stats = self.stats()
  413. for field in ["total_size", "unique_size"]:
  414. stats[field] = format_file_size(stats[field], iec=self.iec)
  415. return self.Summary(**stats)
  416. class LocalCache(CacheStatsMixin):
  417. """
  418. Persistent, local (client-side) cache.
  419. """
  420. def __init__(
  421. self,
  422. manifest,
  423. path=None,
  424. sync=True,
  425. warn_if_unencrypted=True,
  426. progress=False,
  427. lock_wait=None,
  428. cache_mode=FILES_CACHE_MODE_DISABLED,
  429. iec=False,
  430. ):
  431. """
  432. :param warn_if_unencrypted: print warning if accessing unknown unencrypted repository
  433. :param lock_wait: timeout for lock acquisition (int [s] or None [wait forever])
  434. :param sync: do :meth:`.sync`
  435. :param cache_mode: what shall be compared in the file stat infos vs. cached stat infos comparison
  436. """
  437. CacheStatsMixin.__init__(self, iec=iec)
  438. assert isinstance(manifest, Manifest)
  439. self.manifest = manifest
  440. self.repository = manifest.repository
  441. self.key = manifest.key
  442. self.repo_objs = manifest.repo_objs
  443. self.progress = progress
  444. self.cache_mode = cache_mode
  445. self.timestamp = None
  446. self.txn_active = False
  447. self.path = cache_dir(self.repository, path)
  448. self.security_manager = SecurityManager(self.repository)
  449. self.cache_config = CacheConfig(self.repository, self.path, lock_wait)
  450. # Warn user before sending data to a never seen before unencrypted repository
  451. if not os.path.exists(self.path):
  452. self.security_manager.assert_access_unknown(warn_if_unencrypted, manifest, self.key)
  453. self.create()
  454. self.open()
  455. try:
  456. self.security_manager.assert_secure(manifest, self.key, cache_config=self.cache_config)
  457. if not self.check_cache_compatibility():
  458. self.wipe_cache()
  459. self.update_compatibility()
  460. if sync and self.manifest.id != self.cache_config.manifest_id:
  461. self.sync()
  462. self.commit()
  463. except: # noqa
  464. self.close()
  465. raise
  466. def __enter__(self):
  467. return self
  468. def __exit__(self, exc_type, exc_val, exc_tb):
  469. self.close()
  470. def create(self):
  471. """Create a new empty cache at `self.path`"""
  472. os.makedirs(self.path)
  473. with open(os.path.join(self.path, "README"), "w") as fd:
  474. fd.write(CACHE_README)
  475. self.cache_config.create()
  476. ChunkIndex().write(os.path.join(self.path, "chunks"))
  477. os.makedirs(os.path.join(self.path, "chunks.archive.d"))
  478. with SaveFile(os.path.join(self.path, files_cache_name()), binary=True):
  479. pass # empty file
  480. def _do_open(self):
  481. self.cache_config.load()
  482. with IntegrityCheckedFile(
  483. path=os.path.join(self.path, "chunks"),
  484. write=False,
  485. integrity_data=self.cache_config.integrity.get("chunks"),
  486. ) as fd:
  487. self.chunks = ChunkIndex.read(fd)
  488. if "d" in self.cache_mode: # d(isabled)
  489. self.files = None
  490. else:
  491. self._read_files()
  492. def open(self):
  493. if not os.path.isdir(self.path):
  494. raise Exception("%s Does not look like a Borg cache" % self.path)
  495. self.cache_config.open()
  496. self.rollback()
  497. def close(self):
  498. if self.cache_config is not None:
  499. self.cache_config.close()
  500. self.cache_config = None
  501. def _read_files(self):
  502. self.files = {}
  503. self._newest_cmtime = None
  504. logger.debug("Reading files cache ...")
  505. files_cache_logger.debug("FILES-CACHE-LOAD: starting...")
  506. msg = None
  507. try:
  508. with IntegrityCheckedFile(
  509. path=os.path.join(self.path, files_cache_name()),
  510. write=False,
  511. integrity_data=self.cache_config.integrity.get(files_cache_name()),
  512. ) as fd:
  513. u = msgpack.Unpacker(use_list=True)
  514. while True:
  515. data = fd.read(64 * 1024)
  516. if not data:
  517. break
  518. u.feed(data)
  519. try:
  520. for path_hash, item in u:
  521. entry = FileCacheEntry(*item)
  522. # in the end, this takes about 240 Bytes per file
  523. self.files[path_hash] = msgpack.packb(entry._replace(age=entry.age + 1))
  524. except (TypeError, ValueError) as exc:
  525. msg = "The files cache seems invalid. [%s]" % str(exc)
  526. break
  527. except OSError as exc:
  528. msg = "The files cache can't be read. [%s]" % str(exc)
  529. except FileIntegrityError as fie:
  530. msg = "The files cache is corrupted. [%s]" % str(fie)
  531. if msg is not None:
  532. logger.warning(msg)
  533. logger.warning("Continuing without files cache - expect lower performance.")
  534. self.files = {}
  535. files_cache_logger.debug("FILES-CACHE-LOAD: finished, %d entries loaded.", len(self.files))
  536. def begin_txn(self):
  537. # Initialize transaction snapshot
  538. pi = ProgressIndicatorMessage(msgid="cache.begin_transaction")
  539. txn_dir = os.path.join(self.path, "txn.tmp")
  540. os.mkdir(txn_dir)
  541. pi.output("Initializing cache transaction: Reading config")
  542. shutil.copy(os.path.join(self.path, "config"), txn_dir)
  543. pi.output("Initializing cache transaction: Reading chunks")
  544. shutil.copy(os.path.join(self.path, "chunks"), txn_dir)
  545. pi.output("Initializing cache transaction: Reading files")
  546. try:
  547. shutil.copy(os.path.join(self.path, files_cache_name()), txn_dir)
  548. except FileNotFoundError:
  549. with SaveFile(os.path.join(txn_dir, files_cache_name()), binary=True):
  550. pass # empty file
  551. os.replace(txn_dir, os.path.join(self.path, "txn.active"))
  552. self.txn_active = True
  553. pi.finish()
  554. def commit(self):
  555. """Commit transaction"""
  556. if not self.txn_active:
  557. return
  558. self.security_manager.save(self.manifest, self.key)
  559. pi = ProgressIndicatorMessage(msgid="cache.commit")
  560. if self.files is not None:
  561. if self._newest_cmtime is None:
  562. # was never set because no files were modified/added
  563. self._newest_cmtime = 2**63 - 1 # nanoseconds, good until y2262
  564. ttl = int(os.environ.get("BORG_FILES_CACHE_TTL", 20))
  565. pi.output("Saving files cache")
  566. files_cache_logger.debug("FILES-CACHE-SAVE: starting...")
  567. with IntegrityCheckedFile(path=os.path.join(self.path, files_cache_name()), write=True) as fd:
  568. entry_count = 0
  569. for path_hash, item in self.files.items():
  570. # Only keep files seen in this backup that are older than newest cmtime seen in this backup -
  571. # this is to avoid issues with filesystem snapshots and cmtime granularity.
  572. # Also keep files from older backups that have not reached BORG_FILES_CACHE_TTL yet.
  573. entry = FileCacheEntry(*msgpack.unpackb(item))
  574. if (
  575. entry.age == 0
  576. and timestamp_to_int(entry.cmtime) < self._newest_cmtime
  577. or entry.age > 0
  578. and entry.age < ttl
  579. ):
  580. msgpack.pack((path_hash, entry), fd)
  581. entry_count += 1
  582. files_cache_logger.debug("FILES-CACHE-KILL: removed all old entries with age >= TTL [%d]", ttl)
  583. files_cache_logger.debug(
  584. "FILES-CACHE-KILL: removed all current entries with newest cmtime %d", self._newest_cmtime
  585. )
  586. files_cache_logger.debug("FILES-CACHE-SAVE: finished, %d remaining entries saved.", entry_count)
  587. self.cache_config.integrity[files_cache_name()] = fd.integrity_data
  588. pi.output("Saving chunks cache")
  589. with IntegrityCheckedFile(path=os.path.join(self.path, "chunks"), write=True) as fd:
  590. self.chunks.write(fd)
  591. self.cache_config.integrity["chunks"] = fd.integrity_data
  592. pi.output("Saving cache config")
  593. self.cache_config.save(self.manifest, self.key)
  594. os.replace(os.path.join(self.path, "txn.active"), os.path.join(self.path, "txn.tmp"))
  595. shutil.rmtree(os.path.join(self.path, "txn.tmp"))
  596. self.txn_active = False
  597. pi.finish()
  598. def rollback(self):
  599. """Roll back partial and aborted transactions"""
  600. # Remove partial transaction
  601. if os.path.exists(os.path.join(self.path, "txn.tmp")):
  602. shutil.rmtree(os.path.join(self.path, "txn.tmp"))
  603. # Roll back active transaction
  604. txn_dir = os.path.join(self.path, "txn.active")
  605. if os.path.exists(txn_dir):
  606. shutil.copy(os.path.join(txn_dir, "config"), self.path)
  607. shutil.copy(os.path.join(txn_dir, "chunks"), self.path)
  608. shutil.copy(os.path.join(txn_dir, discover_files_cache_name(txn_dir)), self.path)
  609. txn_tmp = os.path.join(self.path, "txn.tmp")
  610. os.replace(txn_dir, txn_tmp)
  611. if os.path.exists(txn_tmp):
  612. shutil.rmtree(txn_tmp)
  613. self.txn_active = False
  614. self._do_open()
  615. def sync(self):
  616. """Re-synchronize chunks cache with repository.
  617. Maintains a directory with known backup archive indexes, so it only
  618. needs to fetch infos from repo and build a chunk index once per backup
  619. archive.
  620. If out of sync, missing archive indexes get added, outdated indexes
  621. get removed and a new master chunks index is built by merging all
  622. archive indexes.
  623. """
  624. archive_path = os.path.join(self.path, "chunks.archive.d")
  625. # Instrumentation
  626. processed_item_metadata_bytes = 0
  627. processed_item_metadata_chunks = 0
  628. compact_chunks_archive_saved_space = 0
  629. def mkpath(id, suffix=""):
  630. id_hex = bin_to_hex(id)
  631. path = os.path.join(archive_path, id_hex + suffix)
  632. return path
  633. def cached_archives():
  634. if self.do_cache:
  635. fns = os.listdir(archive_path)
  636. # filenames with 64 hex digits == 256bit,
  637. # or compact indices which are 64 hex digits + ".compact"
  638. return {unhexlify(fn) for fn in fns if len(fn) == 64} | {
  639. unhexlify(fn[:64]) for fn in fns if len(fn) == 72 and fn.endswith(".compact")
  640. }
  641. else:
  642. return set()
  643. def repo_archives():
  644. return {info.id for info in self.manifest.archives.list()}
  645. def cleanup_outdated(ids):
  646. for id in ids:
  647. cleanup_cached_archive(id)
  648. def cleanup_cached_archive(id, cleanup_compact=True):
  649. try:
  650. os.unlink(mkpath(id))
  651. os.unlink(mkpath(id) + ".integrity")
  652. except FileNotFoundError:
  653. pass
  654. if not cleanup_compact:
  655. return
  656. try:
  657. os.unlink(mkpath(id, suffix=".compact"))
  658. os.unlink(mkpath(id, suffix=".compact") + ".integrity")
  659. except FileNotFoundError:
  660. pass
  661. def fetch_and_build_idx(archive_id, decrypted_repository, chunk_idx):
  662. nonlocal processed_item_metadata_bytes
  663. nonlocal processed_item_metadata_chunks
  664. csize, data = decrypted_repository.get(archive_id)
  665. chunk_idx.add(archive_id, 1, len(data))
  666. archive, verified, _ = self.key.unpack_and_verify_archive(data, force_tam_not_required=True)
  667. archive = ArchiveItem(internal_dict=archive)
  668. if archive.version not in (1, 2): # legacy
  669. raise Exception("Unknown archive metadata version")
  670. if archive.version == 1:
  671. items = archive.items
  672. elif archive.version == 2:
  673. items = []
  674. for chunk_id, (csize, data) in zip(archive.item_ptrs, decrypted_repository.get_many(archive.item_ptrs)):
  675. chunk_idx.add(chunk_id, 1, len(data))
  676. ids = msgpack.unpackb(data)
  677. items.extend(ids)
  678. sync = CacheSynchronizer(chunk_idx)
  679. for item_id, (csize, data) in zip(items, decrypted_repository.get_many(items)):
  680. chunk_idx.add(item_id, 1, len(data))
  681. processed_item_metadata_bytes += len(data)
  682. processed_item_metadata_chunks += 1
  683. sync.feed(data)
  684. if self.do_cache:
  685. write_archive_index(archive_id, chunk_idx)
  686. def write_archive_index(archive_id, chunk_idx):
  687. nonlocal compact_chunks_archive_saved_space
  688. compact_chunks_archive_saved_space += chunk_idx.compact()
  689. fn = mkpath(archive_id, suffix=".compact")
  690. fn_tmp = mkpath(archive_id, suffix=".tmp")
  691. try:
  692. with DetachedIntegrityCheckedFile(
  693. path=fn_tmp, write=True, filename=bin_to_hex(archive_id) + ".compact"
  694. ) as fd:
  695. chunk_idx.write(fd)
  696. except Exception:
  697. safe_unlink(fn_tmp)
  698. else:
  699. os.replace(fn_tmp, fn)
  700. def read_archive_index(archive_id, archive_name):
  701. archive_chunk_idx_path = mkpath(archive_id)
  702. logger.info("Reading cached archive chunk index for %s", archive_name)
  703. try:
  704. try:
  705. # Attempt to load compact index first
  706. with DetachedIntegrityCheckedFile(path=archive_chunk_idx_path + ".compact", write=False) as fd:
  707. archive_chunk_idx = ChunkIndex.read(fd, permit_compact=True)
  708. # In case a non-compact index exists, delete it.
  709. cleanup_cached_archive(archive_id, cleanup_compact=False)
  710. # Compact index read - return index, no conversion necessary (below).
  711. return archive_chunk_idx
  712. except FileNotFoundError:
  713. # No compact index found, load non-compact index, and convert below.
  714. with DetachedIntegrityCheckedFile(path=archive_chunk_idx_path, write=False) as fd:
  715. archive_chunk_idx = ChunkIndex.read(fd)
  716. except FileIntegrityError as fie:
  717. logger.error("Cached archive chunk index of %s is corrupted: %s", archive_name, fie)
  718. # Delete corrupted index, set warning. A new index must be build.
  719. cleanup_cached_archive(archive_id)
  720. set_ec(EXIT_WARNING)
  721. return None
  722. # Convert to compact index. Delete the existing index first.
  723. logger.debug("Found non-compact index for %s, converting to compact.", archive_name)
  724. cleanup_cached_archive(archive_id)
  725. write_archive_index(archive_id, archive_chunk_idx)
  726. return archive_chunk_idx
  727. def get_archive_ids_to_names(archive_ids):
  728. # Pass once over all archives and build a mapping from ids to names.
  729. # The easier approach, doing a similar loop for each archive, has
  730. # square complexity and does about a dozen million functions calls
  731. # with 1100 archives (which takes 30s CPU seconds _alone_).
  732. archive_names = {}
  733. for info in self.manifest.archives.list():
  734. if info.id in archive_ids:
  735. archive_names[info.id] = info.name
  736. assert len(archive_names) == len(archive_ids)
  737. return archive_names
  738. def create_master_idx(chunk_idx):
  739. logger.debug("Synchronizing chunks index...")
  740. cached_ids = cached_archives()
  741. archive_ids = repo_archives()
  742. logger.info(
  743. "Cached archive chunk indexes: %d fresh, %d stale, %d need fetching.",
  744. len(archive_ids & cached_ids),
  745. len(cached_ids - archive_ids),
  746. len(archive_ids - cached_ids),
  747. )
  748. # deallocates old hashindex, creates empty hashindex:
  749. chunk_idx.clear()
  750. cleanup_outdated(cached_ids - archive_ids)
  751. # Explicitly set the usable initial hash table capacity to avoid performance issues
  752. # due to hash table "resonance".
  753. master_index_capacity = len(self.repository)
  754. if archive_ids:
  755. chunk_idx = None if not self.do_cache else ChunkIndex(usable=master_index_capacity)
  756. pi = ProgressIndicatorPercent(
  757. total=len(archive_ids),
  758. step=0.1,
  759. msg="%3.0f%% Syncing chunks index. Processing archive %s.",
  760. msgid="cache.sync",
  761. )
  762. archive_ids_to_names = get_archive_ids_to_names(archive_ids)
  763. for archive_id, archive_name in archive_ids_to_names.items():
  764. pi.show(info=[remove_surrogates(archive_name)]) # legacy. borg2 always has pure unicode arch names.
  765. if self.do_cache:
  766. if archive_id in cached_ids:
  767. archive_chunk_idx = read_archive_index(archive_id, archive_name)
  768. if archive_chunk_idx is None:
  769. cached_ids.remove(archive_id)
  770. if archive_id not in cached_ids:
  771. # Do not make this an else branch; the FileIntegrityError exception handler
  772. # above can remove *archive_id* from *cached_ids*.
  773. logger.info("Fetching and building archive index for %s.", archive_name)
  774. archive_chunk_idx = ChunkIndex()
  775. fetch_and_build_idx(archive_id, decrypted_repository, archive_chunk_idx)
  776. logger.debug("Merging into master chunks index.")
  777. chunk_idx.merge(archive_chunk_idx)
  778. else:
  779. chunk_idx = chunk_idx or ChunkIndex(usable=master_index_capacity)
  780. logger.info("Fetching archive index for %s.", archive_name)
  781. fetch_and_build_idx(archive_id, decrypted_repository, chunk_idx)
  782. pi.finish()
  783. logger.debug(
  784. "Chunks index sync: processed %s (%d chunks) of metadata.",
  785. format_file_size(processed_item_metadata_bytes),
  786. processed_item_metadata_chunks,
  787. )
  788. logger.debug(
  789. "Chunks index sync: compact chunks.archive.d storage saved %s bytes.",
  790. format_file_size(compact_chunks_archive_saved_space),
  791. )
  792. logger.debug("Chunks index sync done.")
  793. return chunk_idx
  794. # The cache can be used by a command that e.g. only checks against Manifest.Operation.WRITE,
  795. # which does not have to include all flags from Manifest.Operation.READ.
  796. # Since the sync will attempt to read archives, check compatibility with Manifest.Operation.READ.
  797. self.manifest.check_repository_compatibility((Manifest.Operation.READ,))
  798. self.begin_txn()
  799. with cache_if_remote(self.repository, decrypted_cache=self.repo_objs) as decrypted_repository:
  800. # TEMPORARY HACK:
  801. # to avoid archive index caching, create a FILE named ~/.cache/borg/REPOID/chunks.archive.d -
  802. # this is only recommended if you have a fast, low latency connection to your repo (e.g. if repo is local).
  803. self.do_cache = os.path.isdir(archive_path)
  804. self.chunks = create_master_idx(self.chunks)
  805. def check_cache_compatibility(self):
  806. my_features = Manifest.SUPPORTED_REPO_FEATURES
  807. if self.cache_config.ignored_features & my_features:
  808. # The cache might not contain references of chunks that need a feature that is mandatory for some operation
  809. # and which this version supports. To avoid corruption while executing that operation force rebuild.
  810. return False
  811. if not self.cache_config.mandatory_features <= my_features:
  812. # The cache was build with consideration to at least one feature that this version does not understand.
  813. # This client might misinterpret the cache. Thus force a rebuild.
  814. return False
  815. return True
  816. def wipe_cache(self):
  817. logger.warning("Discarding incompatible cache and forcing a cache rebuild")
  818. archive_path = os.path.join(self.path, "chunks.archive.d")
  819. if os.path.isdir(archive_path):
  820. shutil.rmtree(os.path.join(self.path, "chunks.archive.d"))
  821. os.makedirs(os.path.join(self.path, "chunks.archive.d"))
  822. self.chunks = ChunkIndex()
  823. with SaveFile(os.path.join(self.path, files_cache_name()), binary=True):
  824. pass # empty file
  825. self.cache_config.manifest_id = ""
  826. self.cache_config._config.set("cache", "manifest", "")
  827. self.cache_config.ignored_features = set()
  828. self.cache_config.mandatory_features = set()
  829. def update_compatibility(self):
  830. operation_to_features_map = self.manifest.get_all_mandatory_features()
  831. my_features = Manifest.SUPPORTED_REPO_FEATURES
  832. repo_features = set()
  833. for operation, features in operation_to_features_map.items():
  834. repo_features.update(features)
  835. self.cache_config.ignored_features.update(repo_features - my_features)
  836. self.cache_config.mandatory_features.update(repo_features & my_features)
  837. def add_chunk(
  838. self, id, meta, data, *, stats, overwrite=False, wait=True, compress=True, size=None, ctype=None, clevel=None
  839. ):
  840. if not self.txn_active:
  841. self.begin_txn()
  842. if size is None and compress:
  843. size = len(data) # data is still uncompressed
  844. refcount = self.seen_chunk(id, size)
  845. if refcount and not overwrite:
  846. return self.chunk_incref(id, stats)
  847. if size is None:
  848. raise ValueError("when giving compressed data for a new chunk, the uncompressed size must be given also")
  849. cdata = self.repo_objs.format(id, meta, data, compress=compress, size=size, ctype=ctype, clevel=clevel)
  850. self.repository.put(id, cdata, wait=wait)
  851. self.chunks.add(id, 1, size)
  852. stats.update(size, not refcount)
  853. return ChunkListEntry(id, size)
  854. def seen_chunk(self, id, size=None):
  855. refcount, stored_size = self.chunks.get(id, ChunkIndexEntry(0, None))
  856. if size is not None and stored_size is not None and size != stored_size:
  857. # we already have a chunk with that id, but different size.
  858. # this is either a hash collision (unlikely) or corruption or a bug.
  859. raise Exception(
  860. "chunk has same id [%r], but different size (stored: %d new: %d)!" % (id, stored_size, size)
  861. )
  862. return refcount
  863. def chunk_incref(self, id, stats, size=None):
  864. if not self.txn_active:
  865. self.begin_txn()
  866. count, _size = self.chunks.incref(id)
  867. stats.update(_size, False)
  868. return ChunkListEntry(id, _size)
  869. def chunk_decref(self, id, stats, wait=True):
  870. if not self.txn_active:
  871. self.begin_txn()
  872. count, size = self.chunks.decref(id)
  873. if count == 0:
  874. del self.chunks[id]
  875. self.repository.delete(id, wait=wait)
  876. stats.update(-size, True)
  877. else:
  878. stats.update(-size, False)
  879. def file_known_and_unchanged(self, hashed_path, path_hash, st):
  880. """
  881. Check if we know the file that has this path_hash (know == it is in our files cache) and
  882. whether it is unchanged (the size/inode number/cmtime is same for stuff we check in this cache_mode).
  883. :param hashed_path: the file's path as we gave it to hash(hashed_path)
  884. :param path_hash: hash(hashed_path), to save some memory in the files cache
  885. :param st: the file's stat() result
  886. :return: known, ids (known is True if we have infos about this file in the cache,
  887. ids is the list of chunk ids IF the file has not changed, otherwise None).
  888. """
  889. if not stat.S_ISREG(st.st_mode):
  890. return False, None
  891. cache_mode = self.cache_mode
  892. if "d" in cache_mode: # d(isabled)
  893. files_cache_logger.debug("UNKNOWN: files cache disabled")
  894. return False, None
  895. # note: r(echunk) does not need the files cache in this method, but the files cache will
  896. # be updated and saved to disk to memorize the files. To preserve previous generations in
  897. # the cache, this means that it also needs to get loaded from disk first.
  898. if "r" in cache_mode: # r(echunk)
  899. files_cache_logger.debug("UNKNOWN: rechunking enforced")
  900. return False, None
  901. entry = self.files.get(path_hash)
  902. if not entry:
  903. files_cache_logger.debug("UNKNOWN: no file metadata in cache for: %r", hashed_path)
  904. return False, None
  905. # we know the file!
  906. entry = FileCacheEntry(*msgpack.unpackb(entry))
  907. if "s" in cache_mode and entry.size != st.st_size:
  908. files_cache_logger.debug("KNOWN-CHANGED: file size has changed: %r", hashed_path)
  909. return True, None
  910. if "i" in cache_mode and entry.inode != st.st_ino:
  911. files_cache_logger.debug("KNOWN-CHANGED: file inode number has changed: %r", hashed_path)
  912. return True, None
  913. if "c" in cache_mode and timestamp_to_int(entry.cmtime) != st.st_ctime_ns:
  914. files_cache_logger.debug("KNOWN-CHANGED: file ctime has changed: %r", hashed_path)
  915. return True, None
  916. elif "m" in cache_mode and timestamp_to_int(entry.cmtime) != st.st_mtime_ns:
  917. files_cache_logger.debug("KNOWN-CHANGED: file mtime has changed: %r", hashed_path)
  918. return True, None
  919. # we ignored the inode number in the comparison above or it is still same.
  920. # if it is still the same, replacing it in the tuple doesn't change it.
  921. # if we ignored it, a reason for doing that is that files were moved to a new
  922. # disk / new fs (so a one-time change of inode number is expected) and we wanted
  923. # to avoid everything getting chunked again. to be able to re-enable the inode
  924. # number comparison in a future backup run (and avoid chunking everything
  925. # again at that time), we need to update the inode number in the cache with what
  926. # we see in the filesystem.
  927. self.files[path_hash] = msgpack.packb(entry._replace(inode=st.st_ino, age=0))
  928. return True, entry.chunk_ids
  929. def memorize_file(self, hashed_path, path_hash, st, ids):
  930. if not stat.S_ISREG(st.st_mode):
  931. return
  932. cache_mode = self.cache_mode
  933. # note: r(echunk) modes will update the files cache, d(isabled) mode won't
  934. if "d" in cache_mode:
  935. files_cache_logger.debug("FILES-CACHE-NOUPDATE: files cache disabled")
  936. return
  937. if "c" in cache_mode:
  938. cmtime_type = "ctime"
  939. cmtime_ns = safe_ns(st.st_ctime_ns)
  940. elif "m" in cache_mode:
  941. cmtime_type = "mtime"
  942. cmtime_ns = safe_ns(st.st_mtime_ns)
  943. else: # neither 'c' nor 'm' in cache_mode, avoid UnboundLocalError
  944. cmtime_type = "ctime"
  945. cmtime_ns = safe_ns(st.st_ctime_ns)
  946. entry = FileCacheEntry(
  947. age=0, inode=st.st_ino, size=st.st_size, cmtime=int_to_timestamp(cmtime_ns), chunk_ids=ids
  948. )
  949. self.files[path_hash] = msgpack.packb(entry)
  950. self._newest_cmtime = max(self._newest_cmtime or 0, cmtime_ns)
  951. files_cache_logger.debug(
  952. "FILES-CACHE-UPDATE: put %r [has %s] <- %r",
  953. entry._replace(chunk_ids="[%d entries]" % len(entry.chunk_ids)),
  954. cmtime_type,
  955. hashed_path,
  956. )
  957. class AdHocCache(CacheStatsMixin):
  958. """
  959. Ad-hoc, non-persistent cache.
  960. Compared to the standard LocalCache the AdHocCache does not maintain accurate reference count,
  961. nor does it provide a files cache (which would require persistence). Chunks that were not added
  962. during the current AdHocCache lifetime won't have correct size set (0 bytes) and will
  963. have an infinite reference count (MAX_VALUE).
  964. """
  965. str_format = """\
  966. All archives: unknown unknown unknown
  967. Unique chunks Total chunks
  968. Chunk index: {0.total_unique_chunks:20d} unknown"""
  969. def __init__(self, manifest, warn_if_unencrypted=True, lock_wait=None, iec=False):
  970. CacheStatsMixin.__init__(self, iec=iec)
  971. assert isinstance(manifest, Manifest)
  972. self.manifest = manifest
  973. self.repository = manifest.repository
  974. self.key = manifest.key
  975. self.repo_objs = manifest.repo_objs
  976. self._txn_active = False
  977. self.security_manager = SecurityManager(self.repository)
  978. self.security_manager.assert_secure(manifest, self.key, lock_wait=lock_wait)
  979. logger.warning("Note: --no-cache-sync is an experimental feature.")
  980. # Public API
  981. def __enter__(self):
  982. return self
  983. def __exit__(self, exc_type, exc_val, exc_tb):
  984. pass
  985. files = None # type: ignore
  986. cache_mode = "d"
  987. def file_known_and_unchanged(self, hashed_path, path_hash, st):
  988. files_cache_logger.debug("UNKNOWN: files cache not implemented")
  989. return False, None
  990. def memorize_file(self, hashed_path, path_hash, st, ids):
  991. pass
  992. def add_chunk(self, id, meta, data, *, stats, overwrite=False, wait=True, compress=True, size=None):
  993. assert not overwrite, "AdHocCache does not permit overwrites — trying to use it for recreate?"
  994. if not self._txn_active:
  995. self.begin_txn()
  996. if size is None and compress:
  997. size = len(data) # data is still uncompressed
  998. if size is None:
  999. raise ValueError("when giving compressed data for a chunk, the uncompressed size must be given also")
  1000. refcount = self.seen_chunk(id, size)
  1001. if refcount:
  1002. return self.chunk_incref(id, stats, size=size)
  1003. cdata = self.repo_objs.format(id, meta, data, compress=compress)
  1004. self.repository.put(id, cdata, wait=wait)
  1005. self.chunks.add(id, 1, size)
  1006. stats.update(size, not refcount)
  1007. return ChunkListEntry(id, size)
  1008. def seen_chunk(self, id, size=None):
  1009. if not self._txn_active:
  1010. self.begin_txn()
  1011. entry = self.chunks.get(id, ChunkIndexEntry(0, None))
  1012. if entry.refcount and size and not entry.size:
  1013. # The LocalCache has existing size information and uses *size* to make an effort at detecting collisions.
  1014. # This is of course not possible for the AdHocCache.
  1015. # Here *size* is used to update the chunk's size information, which will be zero for existing chunks.
  1016. self.chunks[id] = entry._replace(size=size)
  1017. return entry.refcount
  1018. def chunk_incref(self, id, stats, size=None):
  1019. if not self._txn_active:
  1020. self.begin_txn()
  1021. count, _size = self.chunks.incref(id)
  1022. # When _size is 0 and size is not given, then this chunk has not been locally visited yet (seen_chunk with
  1023. # size or add_chunk); we can't add references to those (size=0 is invalid) and generally don't try to.
  1024. size = _size or size
  1025. assert size
  1026. stats.update(size, False)
  1027. return ChunkListEntry(id, size)
  1028. def chunk_decref(self, id, stats, wait=True):
  1029. if not self._txn_active:
  1030. self.begin_txn()
  1031. count, size = self.chunks.decref(id)
  1032. if count == 0:
  1033. del self.chunks[id]
  1034. self.repository.delete(id, wait=wait)
  1035. stats.update(-size, True)
  1036. else:
  1037. stats.update(-size, False)
  1038. def commit(self):
  1039. if not self._txn_active:
  1040. return
  1041. self.security_manager.save(self.manifest, self.key)
  1042. self._txn_active = False
  1043. def rollback(self):
  1044. self._txn_active = False
  1045. del self.chunks
  1046. def begin_txn(self):
  1047. self._txn_active = True
  1048. # Explicitly set the initial usable hash table capacity to avoid performance issues
  1049. # due to hash table "resonance".
  1050. # Since we're creating an archive, add 10 % from the start.
  1051. num_chunks = len(self.repository)
  1052. self.chunks = ChunkIndex(usable=num_chunks * 1.1)
  1053. pi = ProgressIndicatorPercent(
  1054. total=num_chunks, msg="Downloading chunk list... %3.0f%%", msgid="cache.download_chunks"
  1055. )
  1056. t0 = perf_counter()
  1057. num_requests = 0
  1058. marker = None
  1059. while True:
  1060. result = self.repository.list(limit=LIST_SCAN_LIMIT, marker=marker)
  1061. num_requests += 1
  1062. if not result:
  1063. break
  1064. pi.show(increase=len(result))
  1065. marker = result[-1]
  1066. # All chunks from the repository have a refcount of MAX_VALUE, which is sticky,
  1067. # therefore we can't/won't delete them. Chunks we added ourselves in this transaction
  1068. # (e.g. checkpoint archives) are tracked correctly.
  1069. init_entry = ChunkIndexEntry(refcount=ChunkIndex.MAX_VALUE, size=0)
  1070. for id_ in result:
  1071. self.chunks[id_] = init_entry
  1072. assert len(self.chunks) == num_chunks
  1073. # LocalCache does not contain the manifest, either.
  1074. del self.chunks[self.manifest.MANIFEST_ID]
  1075. duration = perf_counter() - t0 or 0.01
  1076. pi.finish()
  1077. logger.debug(
  1078. "AdHocCache: downloaded %d chunk IDs in %.2f s (%d requests), ~%s/s",
  1079. num_chunks,
  1080. duration,
  1081. num_requests,
  1082. format_file_size(num_chunks * 34 / duration),
  1083. )
  1084. # Chunk IDs in a list are encoded in 34 bytes: 1 byte msgpack header, 1 byte length, 32 ID bytes.
  1085. # Protocol overhead is neglected in this calculation.