cache.py 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595
  1. import configparser
  2. import os
  3. import stat
  4. import shutil
  5. from binascii import unhexlify
  6. from collections import namedtuple
  7. import msgpack
  8. from .logger import create_logger
  9. logger = create_logger()
  10. from .constants import CACHE_README
  11. from .hashindex import ChunkIndex, ChunkIndexEntry
  12. from .helpers import Location
  13. from .helpers import Error
  14. from .helpers import get_cache_dir, get_security_dir
  15. from .helpers import int_to_bigint, bigint_to_int, bin_to_hex
  16. from .helpers import format_file_size
  17. from .helpers import safe_ns
  18. from .helpers import yes, hostname_is_unique
  19. from .helpers import remove_surrogates
  20. from .helpers import ProgressIndicatorPercent, ProgressIndicatorMessage
  21. from .item import Item, ArchiveItem, ChunkListEntry
  22. from .key import PlaintextKey
  23. from .locking import Lock
  24. from .platform import SaveFile
  25. from .remote import cache_if_remote
  26. FileCacheEntry = namedtuple('FileCacheEntry', 'age inode size mtime chunk_ids')
  27. class SecurityManager:
  28. def __init__(self, repository):
  29. self.repository = repository
  30. self.dir = get_security_dir(repository.id_str)
  31. self.key_type_file = os.path.join(self.dir, 'key-type')
  32. self.location_file = os.path.join(self.dir, 'location')
  33. self.manifest_ts_file = os.path.join(self.dir, 'manifest-timestamp')
  34. def known(self):
  35. return os.path.exists(self.key_type_file)
  36. def key_matches(self, key):
  37. if not self.known():
  38. return False
  39. try:
  40. with open(self.key_type_file, 'r') as fd:
  41. type = fd.read()
  42. return type == str(key.TYPE)
  43. except OSError as exc:
  44. logger.warning('Could not read/parse key type file: %s', exc)
  45. def save(self, manifest, key, cache):
  46. logger.debug('security: saving state for %s to %s', self.repository.id_str, self.dir)
  47. current_location = cache.repository._location.canonical_path()
  48. logger.debug('security: current location %s', current_location)
  49. logger.debug('security: key type %s', str(key.TYPE))
  50. logger.debug('security: manifest timestamp %s', manifest.timestamp)
  51. with open(self.location_file, 'w') as fd:
  52. fd.write(current_location)
  53. with open(self.key_type_file, 'w') as fd:
  54. fd.write(str(key.TYPE))
  55. with open(self.manifest_ts_file, 'w') as fd:
  56. fd.write(manifest.timestamp)
  57. def assert_location_matches(self, cache):
  58. # Warn user before sending data to a relocated repository
  59. try:
  60. with open(self.location_file) as fd:
  61. previous_location = fd.read()
  62. logger.debug('security: read previous_location %r', previous_location)
  63. except FileNotFoundError:
  64. logger.debug('security: previous_location file %s not found', self.location_file)
  65. previous_location = None
  66. except OSError as exc:
  67. logger.warning('Could not read previous location file: %s', exc)
  68. previous_location = None
  69. if cache.previous_location and previous_location != cache.previous_location:
  70. # Reconcile cache and security dir; we take the cache location.
  71. previous_location = cache.previous_location
  72. logger.debug('security: using previous_location of cache: %r', previous_location)
  73. if previous_location and previous_location != self.repository._location.canonical_path():
  74. msg = ("Warning: The repository at location {} was previously located at {}\n".format(
  75. self.repository._location.canonical_path(), previous_location) +
  76. "Do you want to continue? [yN] ")
  77. if not yes(msg, false_msg="Aborting.", invalid_msg="Invalid answer, aborting.",
  78. retry=False, env_var_override='BORG_RELOCATED_REPO_ACCESS_IS_OK'):
  79. raise Cache.RepositoryAccessAborted()
  80. # adapt on-disk config immediately if the new location was accepted
  81. logger.debug('security: updating location stored in cache and security dir')
  82. with open(self.location_file, 'w') as fd:
  83. fd.write(cache.repository._location.canonical_path())
  84. cache.begin_txn()
  85. cache.commit()
  86. def assert_no_manifest_replay(self, manifest, key, cache):
  87. try:
  88. with open(self.manifest_ts_file) as fd:
  89. timestamp = fd.read()
  90. logger.debug('security: read manifest timestamp %r', timestamp)
  91. except FileNotFoundError:
  92. logger.debug('security: manifest timestamp file %s not found', self.manifest_ts_file)
  93. timestamp = ''
  94. except OSError as exc:
  95. logger.warning('Could not read previous location file: %s', exc)
  96. timestamp = ''
  97. timestamp = max(timestamp, cache.timestamp or '')
  98. logger.debug('security: determined newest manifest timestamp as %s', timestamp)
  99. # If repository is older than the cache or security dir something fishy is going on
  100. if timestamp and timestamp > manifest.timestamp:
  101. if isinstance(key, PlaintextKey):
  102. raise Cache.RepositoryIDNotUnique()
  103. else:
  104. raise Cache.RepositoryReplay()
  105. def assert_key_type(self, key, cache):
  106. # Make sure an encrypted repository has not been swapped for an unencrypted repository
  107. if cache.key_type is not None and cache.key_type != str(key.TYPE):
  108. raise Cache.EncryptionMethodMismatch()
  109. if self.known() and not self.key_matches(key):
  110. raise Cache.EncryptionMethodMismatch()
  111. def assert_secure(self, manifest, key, cache):
  112. self.assert_location_matches(cache)
  113. self.assert_key_type(key, cache)
  114. self.assert_no_manifest_replay(manifest, key, cache)
  115. if not self.known():
  116. self.save(manifest, key, cache)
  117. def assert_access_unknown(self, warn_if_unencrypted, key):
  118. if warn_if_unencrypted and isinstance(key, PlaintextKey) and not self.known():
  119. msg = ("Warning: Attempting to access a previously unknown unencrypted repository!\n" +
  120. "Do you want to continue? [yN] ")
  121. if not yes(msg, false_msg="Aborting.", invalid_msg="Invalid answer, aborting.",
  122. retry=False, env_var_override='BORG_UNKNOWN_UNENCRYPTED_REPO_ACCESS_IS_OK'):
  123. raise Cache.CacheInitAbortedError()
  124. class Cache:
  125. """Client Side cache
  126. """
  127. class RepositoryIDNotUnique(Error):
  128. """Cache is newer than repository - do you have multiple, independently updated repos with same ID?"""
  129. class RepositoryReplay(Error):
  130. """Cache is newer than repository - this is either an attack or unsafe (multiple repos with same ID)"""
  131. class CacheInitAbortedError(Error):
  132. """Cache initialization aborted"""
  133. class RepositoryAccessAborted(Error):
  134. """Repository access aborted"""
  135. class EncryptionMethodMismatch(Error):
  136. """Repository encryption method changed since last access, refusing to continue"""
  137. @staticmethod
  138. def break_lock(repository, path=None):
  139. path = path or os.path.join(get_cache_dir(), repository.id_str)
  140. Lock(os.path.join(path, 'lock'), exclusive=True).break_lock()
  141. @staticmethod
  142. def destroy(repository, path=None):
  143. """destroy the cache for ``repository`` or at ``path``"""
  144. path = path or os.path.join(get_cache_dir(), repository.id_str)
  145. config = os.path.join(path, 'config')
  146. if os.path.exists(config):
  147. os.remove(config) # kill config first
  148. shutil.rmtree(path)
  149. def __init__(self, repository, key, manifest, path=None, sync=True, do_files=False, warn_if_unencrypted=True,
  150. progress=False, lock_wait=None):
  151. """
  152. :param do_files: use file metadata cache
  153. :param warn_if_unencrypted: print warning if accessing unknown unencrypted repository
  154. :param lock_wait: timeout for lock acquisition (None: return immediately if lock unavailable)
  155. :param sync: do :meth:`.sync`
  156. """
  157. self.lock = None
  158. self.timestamp = None
  159. self.lock = None
  160. self.txn_active = False
  161. self.repository = repository
  162. self.key = key
  163. self.manifest = manifest
  164. self.progress = progress
  165. self.path = path or os.path.join(get_cache_dir(), repository.id_str)
  166. self.security_manager = SecurityManager(repository)
  167. self.do_files = do_files
  168. # Warn user before sending data to a never seen before unencrypted repository
  169. if not os.path.exists(self.path):
  170. self.security_manager.assert_access_unknown(warn_if_unencrypted, key)
  171. self.create()
  172. self.open(lock_wait=lock_wait)
  173. try:
  174. self.security_manager.assert_secure(manifest, key, self)
  175. if sync and self.manifest.id != self.manifest_id:
  176. self.sync()
  177. self.commit()
  178. except:
  179. self.close()
  180. raise
  181. def __enter__(self):
  182. return self
  183. def __exit__(self, exc_type, exc_val, exc_tb):
  184. self.close()
  185. def __str__(self):
  186. fmt = """\
  187. All archives: {0.total_size:>20s} {0.total_csize:>20s} {0.unique_csize:>20s}
  188. Unique chunks Total chunks
  189. Chunk index: {0.total_unique_chunks:20d} {0.total_chunks:20d}"""
  190. return fmt.format(self.format_tuple())
  191. Summary = namedtuple('Summary', ['total_size', 'total_csize', 'unique_size', 'unique_csize', 'total_unique_chunks',
  192. 'total_chunks'])
  193. def stats(self):
  194. # XXX: this should really be moved down to `hashindex.pyx`
  195. stats = self.Summary(*self.chunks.summarize())._asdict()
  196. return stats
  197. def format_tuple(self):
  198. stats = self.stats()
  199. for field in ['total_size', 'total_csize', 'unique_csize']:
  200. stats[field] = format_file_size(stats[field])
  201. return self.Summary(**stats)
  202. def chunks_stored_size(self):
  203. return self.stats()['unique_csize']
  204. def create(self):
  205. """Create a new empty cache at `self.path`
  206. """
  207. os.makedirs(self.path)
  208. with open(os.path.join(self.path, 'README'), 'w') as fd:
  209. fd.write(CACHE_README)
  210. config = configparser.ConfigParser(interpolation=None)
  211. config.add_section('cache')
  212. config.set('cache', 'version', '1')
  213. config.set('cache', 'repository', self.repository.id_str)
  214. config.set('cache', 'manifest', '')
  215. with SaveFile(os.path.join(self.path, 'config')) as fd:
  216. config.write(fd)
  217. ChunkIndex().write(os.path.join(self.path, 'chunks').encode('utf-8'))
  218. os.makedirs(os.path.join(self.path, 'chunks.archive.d'))
  219. with SaveFile(os.path.join(self.path, 'files'), binary=True) as fd:
  220. pass # empty file
  221. def _check_upgrade(self, config_path):
  222. try:
  223. cache_version = self.config.getint('cache', 'version')
  224. wanted_version = 1
  225. if cache_version != wanted_version:
  226. self.close()
  227. raise Exception('%s has unexpected cache version %d (wanted: %d).' % (
  228. config_path, cache_version, wanted_version))
  229. except configparser.NoSectionError:
  230. self.close()
  231. raise Exception('%s does not look like a Borg cache.' % config_path) from None
  232. # borg < 1.0.8rc1 had different canonicalization for the repo location (see #1655 and #1741).
  233. cache_loc = self.config.get('cache', 'previous_location', fallback=None)
  234. if cache_loc:
  235. repo_loc = self.repository._location.canonical_path()
  236. rl = Location(repo_loc)
  237. cl = Location(cache_loc)
  238. if cl.proto == rl.proto and cl.user == rl.user and cl.host == rl.host and cl.port == rl.port \
  239. and \
  240. cl.path and rl.path and \
  241. cl.path.startswith('/~/') and rl.path.startswith('/./') and cl.path[3:] == rl.path[3:]:
  242. # everything is same except the expected change in relative path canonicalization,
  243. # update previous_location to avoid warning / user query about changed location:
  244. self.config.set('cache', 'previous_location', repo_loc)
  245. def _do_open(self):
  246. self.config = configparser.ConfigParser(interpolation=None)
  247. config_path = os.path.join(self.path, 'config')
  248. self.config.read(config_path)
  249. self._check_upgrade(config_path)
  250. self.id = self.config.get('cache', 'repository')
  251. self.manifest_id = unhexlify(self.config.get('cache', 'manifest'))
  252. self.timestamp = self.config.get('cache', 'timestamp', fallback=None)
  253. self.key_type = self.config.get('cache', 'key_type', fallback=None)
  254. self.previous_location = self.config.get('cache', 'previous_location', fallback=None)
  255. self.chunks = ChunkIndex.read(os.path.join(self.path, 'chunks').encode('utf-8'))
  256. self.files = None
  257. def open(self, lock_wait=None):
  258. if not os.path.isdir(self.path):
  259. raise Exception('%s Does not look like a Borg cache' % self.path)
  260. self.lock = Lock(os.path.join(self.path, 'lock'), exclusive=True, timeout=lock_wait, kill_stale_locks=hostname_is_unique()).acquire()
  261. self.rollback()
  262. def close(self):
  263. if self.lock is not None:
  264. self.lock.release()
  265. self.lock = None
  266. def _read_files(self):
  267. self.files = {}
  268. self._newest_mtime = None
  269. logger.debug('Reading files cache ...')
  270. with open(os.path.join(self.path, 'files'), 'rb') as fd:
  271. u = msgpack.Unpacker(use_list=True)
  272. while True:
  273. data = fd.read(64 * 1024)
  274. if not data:
  275. break
  276. u.feed(data)
  277. for path_hash, item in u:
  278. entry = FileCacheEntry(*item)
  279. # in the end, this takes about 240 Bytes per file
  280. self.files[path_hash] = msgpack.packb(entry._replace(age=entry.age + 1))
  281. def begin_txn(self):
  282. # Initialize transaction snapshot
  283. pi = ProgressIndicatorMessage(msgid='cache.begin_transaction')
  284. txn_dir = os.path.join(self.path, 'txn.tmp')
  285. os.mkdir(txn_dir)
  286. pi.output('Initializing cache transaction: Reading config')
  287. shutil.copy(os.path.join(self.path, 'config'), txn_dir)
  288. pi.output('Initializing cache transaction: Reading chunks')
  289. shutil.copy(os.path.join(self.path, 'chunks'), txn_dir)
  290. pi.output('Initializing cache transaction: Reading files')
  291. shutil.copy(os.path.join(self.path, 'files'), txn_dir)
  292. os.rename(os.path.join(self.path, 'txn.tmp'),
  293. os.path.join(self.path, 'txn.active'))
  294. self.txn_active = True
  295. pi.finish()
  296. def commit(self):
  297. """Commit transaction
  298. """
  299. if not self.txn_active:
  300. return
  301. self.security_manager.save(self.manifest, self.key, self)
  302. pi = ProgressIndicatorMessage(msgid='cache.commit')
  303. if self.files is not None:
  304. if self._newest_mtime is None:
  305. # was never set because no files were modified/added
  306. self._newest_mtime = 2 ** 63 - 1 # nanoseconds, good until y2262
  307. ttl = int(os.environ.get('BORG_FILES_CACHE_TTL', 20))
  308. pi.output('Saving files cache')
  309. with SaveFile(os.path.join(self.path, 'files'), binary=True) as fd:
  310. for path_hash, item in self.files.items():
  311. # Only keep files seen in this backup that are older than newest mtime seen in this backup -
  312. # this is to avoid issues with filesystem snapshots and mtime granularity.
  313. # Also keep files from older backups that have not reached BORG_FILES_CACHE_TTL yet.
  314. entry = FileCacheEntry(*msgpack.unpackb(item))
  315. if entry.age == 0 and bigint_to_int(entry.mtime) < self._newest_mtime or \
  316. entry.age > 0 and entry.age < ttl:
  317. msgpack.pack((path_hash, entry), fd)
  318. pi.output('Saving cache config')
  319. self.config.set('cache', 'manifest', self.manifest.id_str)
  320. self.config.set('cache', 'timestamp', self.manifest.timestamp)
  321. self.config.set('cache', 'key_type', str(self.key.TYPE))
  322. self.config.set('cache', 'previous_location', self.repository._location.canonical_path())
  323. with SaveFile(os.path.join(self.path, 'config')) as fd:
  324. self.config.write(fd)
  325. pi.output('Saving chunks cache')
  326. self.chunks.write(os.path.join(self.path, 'chunks').encode('utf-8'))
  327. os.rename(os.path.join(self.path, 'txn.active'),
  328. os.path.join(self.path, 'txn.tmp'))
  329. shutil.rmtree(os.path.join(self.path, 'txn.tmp'))
  330. self.txn_active = False
  331. pi.finish()
  332. def rollback(self):
  333. """Roll back partial and aborted transactions
  334. """
  335. # Remove partial transaction
  336. if os.path.exists(os.path.join(self.path, 'txn.tmp')):
  337. shutil.rmtree(os.path.join(self.path, 'txn.tmp'))
  338. # Roll back active transaction
  339. txn_dir = os.path.join(self.path, 'txn.active')
  340. if os.path.exists(txn_dir):
  341. shutil.copy(os.path.join(txn_dir, 'config'), self.path)
  342. shutil.copy(os.path.join(txn_dir, 'chunks'), self.path)
  343. shutil.copy(os.path.join(txn_dir, 'files'), self.path)
  344. os.rename(txn_dir, os.path.join(self.path, 'txn.tmp'))
  345. if os.path.exists(os.path.join(self.path, 'txn.tmp')):
  346. shutil.rmtree(os.path.join(self.path, 'txn.tmp'))
  347. self.txn_active = False
  348. self._do_open()
  349. def sync(self):
  350. """Re-synchronize chunks cache with repository.
  351. Maintains a directory with known backup archive indexes, so it only
  352. needs to fetch infos from repo and build a chunk index once per backup
  353. archive.
  354. If out of sync, missing archive indexes get added, outdated indexes
  355. get removed and a new master chunks index is built by merging all
  356. archive indexes.
  357. """
  358. archive_path = os.path.join(self.path, 'chunks.archive.d')
  359. def mkpath(id, suffix=''):
  360. id_hex = bin_to_hex(id)
  361. path = os.path.join(archive_path, id_hex + suffix)
  362. return path.encode('utf-8')
  363. def cached_archives():
  364. if self.do_cache:
  365. fns = os.listdir(archive_path)
  366. # filenames with 64 hex digits == 256bit
  367. return set(unhexlify(fn) for fn in fns if len(fn) == 64)
  368. else:
  369. return set()
  370. def repo_archives():
  371. return set(info.id for info in self.manifest.archives.list())
  372. def cleanup_outdated(ids):
  373. for id in ids:
  374. os.unlink(mkpath(id))
  375. def fetch_and_build_idx(archive_id, repository, key, chunk_idx):
  376. cdata = repository.get(archive_id)
  377. data = key.decrypt(archive_id, cdata)
  378. chunk_idx.add(archive_id, 1, len(data), len(cdata))
  379. archive = ArchiveItem(internal_dict=msgpack.unpackb(data))
  380. if archive.version != 1:
  381. raise Exception('Unknown archive metadata version')
  382. unpacker = msgpack.Unpacker()
  383. for item_id, chunk in zip(archive.items, repository.get_many(archive.items)):
  384. data = key.decrypt(item_id, chunk)
  385. chunk_idx.add(item_id, 1, len(data), len(chunk))
  386. unpacker.feed(data)
  387. for item in unpacker:
  388. if not isinstance(item, dict):
  389. logger.error('Error: Did not get expected metadata dict - archive corrupted!')
  390. continue
  391. for chunk_id, size, csize in item.get(b'chunks', []):
  392. chunk_idx.add(chunk_id, 1, size, csize)
  393. if self.do_cache:
  394. fn = mkpath(archive_id)
  395. fn_tmp = mkpath(archive_id, suffix='.tmp')
  396. try:
  397. chunk_idx.write(fn_tmp)
  398. except Exception:
  399. os.unlink(fn_tmp)
  400. else:
  401. os.rename(fn_tmp, fn)
  402. def lookup_name(archive_id):
  403. for info in self.manifest.archives.list():
  404. if info.id == archive_id:
  405. return info.name
  406. def create_master_idx(chunk_idx):
  407. logger.info('Synchronizing chunks cache...')
  408. cached_ids = cached_archives()
  409. archive_ids = repo_archives()
  410. logger.info('Archives: %d, w/ cached Idx: %d, w/ outdated Idx: %d, w/o cached Idx: %d.' % (
  411. len(archive_ids), len(cached_ids),
  412. len(cached_ids - archive_ids), len(archive_ids - cached_ids), ))
  413. # deallocates old hashindex, creates empty hashindex:
  414. chunk_idx.clear()
  415. cleanup_outdated(cached_ids - archive_ids)
  416. if archive_ids:
  417. chunk_idx = None
  418. if self.progress:
  419. pi = ProgressIndicatorPercent(total=len(archive_ids), step=0.1,
  420. msg='%3.0f%% Syncing chunks cache. Processing archive %s',
  421. msgid='cache.sync')
  422. for archive_id in archive_ids:
  423. archive_name = lookup_name(archive_id)
  424. if self.progress:
  425. pi.show(info=[remove_surrogates(archive_name)])
  426. if self.do_cache:
  427. if archive_id in cached_ids:
  428. archive_chunk_idx_path = mkpath(archive_id)
  429. logger.info("Reading cached archive chunk index for %s ..." % archive_name)
  430. archive_chunk_idx = ChunkIndex.read(archive_chunk_idx_path)
  431. else:
  432. logger.info('Fetching and building archive index for %s ...' % archive_name)
  433. archive_chunk_idx = ChunkIndex()
  434. fetch_and_build_idx(archive_id, repository, self.key, archive_chunk_idx)
  435. logger.info("Merging into master chunks index ...")
  436. if chunk_idx is None:
  437. # we just use the first archive's idx as starting point,
  438. # to avoid growing the hash table from 0 size and also
  439. # to save 1 merge call.
  440. chunk_idx = archive_chunk_idx
  441. else:
  442. chunk_idx.merge(archive_chunk_idx)
  443. else:
  444. chunk_idx = chunk_idx or ChunkIndex()
  445. logger.info('Fetching archive index for %s ...' % archive_name)
  446. fetch_and_build_idx(archive_id, repository, self.key, chunk_idx)
  447. if self.progress:
  448. pi.finish()
  449. logger.info('Done.')
  450. return chunk_idx
  451. def legacy_cleanup():
  452. """bring old cache dirs into the desired state (cleanup and adapt)"""
  453. try:
  454. os.unlink(os.path.join(self.path, 'chunks.archive'))
  455. except:
  456. pass
  457. try:
  458. os.unlink(os.path.join(self.path, 'chunks.archive.tmp'))
  459. except:
  460. pass
  461. try:
  462. os.mkdir(archive_path)
  463. except:
  464. pass
  465. self.begin_txn()
  466. with cache_if_remote(self.repository) as repository:
  467. legacy_cleanup()
  468. # TEMPORARY HACK: to avoid archive index caching, create a FILE named ~/.cache/borg/REPOID/chunks.archive.d -
  469. # this is only recommended if you have a fast, low latency connection to your repo (e.g. if repo is local disk)
  470. self.do_cache = os.path.isdir(archive_path)
  471. self.chunks = create_master_idx(self.chunks)
  472. def add_chunk(self, id, chunk, stats, overwrite=False, wait=True):
  473. if not self.txn_active:
  474. self.begin_txn()
  475. size = len(chunk)
  476. refcount = self.seen_chunk(id, size)
  477. if refcount and not overwrite:
  478. return self.chunk_incref(id, stats)
  479. data = self.key.encrypt(chunk)
  480. csize = len(data)
  481. self.repository.put(id, data, wait=wait)
  482. self.chunks.add(id, 1, size, csize)
  483. stats.update(size, csize, not refcount)
  484. return ChunkListEntry(id, size, csize)
  485. def seen_chunk(self, id, size=None):
  486. refcount, stored_size, _ = self.chunks.get(id, ChunkIndexEntry(0, None, None))
  487. if size is not None and stored_size is not None and size != stored_size:
  488. # we already have a chunk with that id, but different size.
  489. # this is either a hash collision (unlikely) or corruption or a bug.
  490. raise Exception("chunk has same id [%r], but different size (stored: %d new: %d)!" % (
  491. id, stored_size, size))
  492. return refcount
  493. def chunk_incref(self, id, stats):
  494. if not self.txn_active:
  495. self.begin_txn()
  496. count, size, csize = self.chunks.incref(id)
  497. stats.update(size, csize, False)
  498. return ChunkListEntry(id, size, csize)
  499. def chunk_decref(self, id, stats, wait=True):
  500. if not self.txn_active:
  501. self.begin_txn()
  502. count, size, csize = self.chunks.decref(id)
  503. if count == 0:
  504. del self.chunks[id]
  505. self.repository.delete(id, wait=wait)
  506. stats.update(-size, -csize, True)
  507. else:
  508. stats.update(-size, -csize, False)
  509. def file_known_and_unchanged(self, path_hash, st, ignore_inode=False):
  510. if not (self.do_files and stat.S_ISREG(st.st_mode)):
  511. return None
  512. if self.files is None:
  513. self._read_files()
  514. entry = self.files.get(path_hash)
  515. if not entry:
  516. return None
  517. entry = FileCacheEntry(*msgpack.unpackb(entry))
  518. if (entry.size == st.st_size and bigint_to_int(entry.mtime) == st.st_mtime_ns and
  519. (ignore_inode or entry.inode == st.st_ino)):
  520. # we ignored the inode number in the comparison above or it is still same.
  521. # if it is still the same, replacing it in the tuple doesn't change it.
  522. # if we ignored it, a reason for doing that is that files were moved to a new
  523. # disk / new fs (so a one-time change of inode number is expected) and we wanted
  524. # to avoid everything getting chunked again. to be able to re-enable the inode
  525. # number comparison in a future backup run (and avoid chunking everything
  526. # again at that time), we need to update the inode number in the cache with what
  527. # we see in the filesystem.
  528. self.files[path_hash] = msgpack.packb(entry._replace(inode=st.st_ino, age=0))
  529. return entry.chunk_ids
  530. else:
  531. return None
  532. def memorize_file(self, path_hash, st, ids):
  533. if not (self.do_files and stat.S_ISREG(st.st_mode)):
  534. return
  535. mtime_ns = safe_ns(st.st_mtime_ns)
  536. entry = FileCacheEntry(age=0, inode=st.st_ino, size=st.st_size, mtime=int_to_bigint(mtime_ns), chunk_ids=ids)
  537. self.files[path_hash] = msgpack.packb(entry)
  538. self._newest_mtime = max(self._newest_mtime or 0, mtime_ns)