|  | @@ -13,6 +13,8 @@ logger = create_logger()
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  files_cache_logger = create_logger("borg.debug.files_cache")
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +from borgstore.store import ItemInfo
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |  from .constants import CACHE_README, FILES_CACHE_MODE_DISABLED, ROBJ_FILE_STREAM, TIME_DIFFERS2_NS
 | 
	
		
			
				|  |  |  from .checksums import xxh64
 | 
	
		
			
				|  |  |  from .hashindex import ChunkIndex, ChunkIndexEntry
 | 
	
	
		
			
				|  | @@ -663,63 +665,101 @@ class FilesCacheMixin:
 | 
	
		
			
				|  |  |          )
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -def load_chunks_hash(repository) -> bytes:
 | 
	
		
			
				|  |  | +def try_upgrade_to_b14(repository):
 | 
	
		
			
				|  |  | +    # TODO: remove this before 2.0.0 release
 | 
	
		
			
				|  |  |      try:
 | 
	
		
			
				|  |  |          hash = repository.store_load("cache/chunks_hash")
 | 
	
		
			
				|  |  | -        logger.debug(f"cache/chunks_hash is '{bin_to_hex(hash)}'.")
 | 
	
		
			
				|  |  |      except (Repository.ObjectNotFound, StoreObjectNotFound):
 | 
	
		
			
				|  |  |          # TODO: ^ seem like RemoteRepository raises Repository.ONF instead of StoreONF
 | 
	
		
			
				|  |  | -        hash = b""
 | 
	
		
			
				|  |  | -        logger.debug("cache/chunks_hash missing!")
 | 
	
		
			
				|  |  | -    return hash
 | 
	
		
			
				|  |  | +        pass  # likely already upgraded
 | 
	
		
			
				|  |  | +    else:
 | 
	
		
			
				|  |  | +        old_name = "cache/chunks"
 | 
	
		
			
				|  |  | +        new_name = f"cache/chunks.{bin_to_hex(hash)}"
 | 
	
		
			
				|  |  | +        logger.debug(f"renaming {old_name} to {new_name}.")
 | 
	
		
			
				|  |  | +        repository.store_move(old_name, new_name)
 | 
	
		
			
				|  |  | +        repository.store_delete("cache/chunks_hash")
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +def list_chunkindex_hashes(repository):
 | 
	
		
			
				|  |  | +    hashes = set()
 | 
	
		
			
				|  |  | +    for info in repository.store_list("cache"):
 | 
	
		
			
				|  |  | +        info = ItemInfo(*info)  # RPC does not give namedtuple
 | 
	
		
			
				|  |  | +        if info.name.startswith("chunks."):
 | 
	
		
			
				|  |  | +            hash = info.name.removeprefix("chunks.")
 | 
	
		
			
				|  |  | +            hashes.add(hash)
 | 
	
		
			
				|  |  | +    logger.debug(f"cached chunk indexes: {hashes}")
 | 
	
		
			
				|  |  | +    return hashes
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +def delete_chunkindex_cache(repository):
 | 
	
		
			
				|  |  | +    hashes = list_chunkindex_hashes(repository)
 | 
	
		
			
				|  |  | +    for hash in hashes:
 | 
	
		
			
				|  |  | +        cache_name = f"cache/chunks.{hash}"
 | 
	
		
			
				|  |  | +        try:
 | 
	
		
			
				|  |  | +            repository.store_delete(cache_name)
 | 
	
		
			
				|  |  | +        except (Repository.ObjectNotFound, StoreObjectNotFound):
 | 
	
		
			
				|  |  | +            # TODO: ^ seem like RemoteRepository raises Repository.ONF instead of StoreONF
 | 
	
		
			
				|  |  | +            pass
 | 
	
		
			
				|  |  | +    logger.debug(f"cached chunk indexes deleted: {hashes}")
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  CHUNKINDEX_HASH_SEED = 2
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -def write_chunkindex_to_repo_cache(repository, chunks, *, clear=False, force_write=False):
 | 
	
		
			
				|  |  | -    cached_hash = load_chunks_hash(repository)
 | 
	
		
			
				|  |  | +def write_chunkindex_to_repo_cache(repository, chunks, *, clear=False, force_write=False, delete_other=False):
 | 
	
		
			
				|  |  | +    cached_hashes = list_chunkindex_hashes(repository)
 | 
	
		
			
				|  |  |      with io.BytesIO() as f:
 | 
	
		
			
				|  |  |          chunks.write(f)
 | 
	
		
			
				|  |  |          data = f.getvalue()
 | 
	
		
			
				|  |  |      if clear:
 | 
	
		
			
				|  |  |          # if we don't need the in-memory chunks index anymore:
 | 
	
		
			
				|  |  |          chunks.clear()  # free memory, immediately
 | 
	
		
			
				|  |  | -    new_hash = xxh64(data, seed=CHUNKINDEX_HASH_SEED)
 | 
	
		
			
				|  |  | -    if force_write or new_hash != cached_hash:
 | 
	
		
			
				|  |  | -        # when an updated chunks index is stored into the cache, we also store its hash into the cache.
 | 
	
		
			
				|  |  | +    new_hash = bin_to_hex(xxh64(data, seed=CHUNKINDEX_HASH_SEED))
 | 
	
		
			
				|  |  | +    if force_write or new_hash not in cached_hashes:
 | 
	
		
			
				|  |  | +        # when an updated chunks index is stored into the cache, we also store its hash as part of the name.
 | 
	
		
			
				|  |  |          # when a client is loading the chunks index from a cache, it has to compare its xxh64
 | 
	
		
			
				|  |  | -        # hash against cache/chunks_hash in the repository. if it is the same, the cache
 | 
	
		
			
				|  |  | -        # is valid. If it is different, the cache is either corrupted or out of date and
 | 
	
		
			
				|  |  | -        # has to be discarded.
 | 
	
		
			
				|  |  | -        # when some functionality is DELETING chunks from the repository, it has to either update
 | 
	
		
			
				|  |  | -        # both cache/chunks and cache/chunks_hash (like borg compact does) or it has to delete both,
 | 
	
		
			
				|  |  | +        # hash against the hash in its name. if it is the same, the cache is valid.
 | 
	
		
			
				|  |  | +        # if it is different, the cache is either corrupted or out of date and has to be discarded.
 | 
	
		
			
				|  |  | +        # when some functionality is DELETING chunks from the repository, it has to delete
 | 
	
		
			
				|  |  | +        # all existing cache/chunks.* and maybe write a new, valid cache/chunks.<hash>,
 | 
	
		
			
				|  |  |          # so that all clients will discard any client-local chunks index caches.
 | 
	
		
			
				|  |  | -        logger.debug(f"caching chunks index {bin_to_hex(new_hash)} in repository...")
 | 
	
		
			
				|  |  | -        repository.store_store("cache/chunks", data)
 | 
	
		
			
				|  |  | -        repository.store_store("cache/chunks_hash", new_hash)
 | 
	
		
			
				|  |  | +        cache_name = f"cache/chunks.{new_hash}"
 | 
	
		
			
				|  |  | +        logger.debug(f"caching chunks index as {cache_name} in repository...")
 | 
	
		
			
				|  |  | +        repository.store_store(cache_name, data)
 | 
	
		
			
				|  |  | +        if delete_other:
 | 
	
		
			
				|  |  | +            for hash in cached_hashes:
 | 
	
		
			
				|  |  | +                cache_name = f"cache/chunks.{hash}"
 | 
	
		
			
				|  |  | +                try:
 | 
	
		
			
				|  |  | +                    repository.store_delete(cache_name)
 | 
	
		
			
				|  |  | +                except (Repository.ObjectNotFound, StoreObjectNotFound):
 | 
	
		
			
				|  |  | +                    # TODO: ^ seem like RemoteRepository raises Repository.ONF instead of StoreONF
 | 
	
		
			
				|  |  | +                    pass
 | 
	
		
			
				|  |  | +            logger.debug(f"cached chunk indexes deleted: {cached_hashes}")
 | 
	
		
			
				|  |  |      return new_hash
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  def build_chunkindex_from_repo(repository, *, disable_caches=False, cache_immediately=False):
 | 
	
		
			
				|  |  | -    chunks = None
 | 
	
		
			
				|  |  | +    try_upgrade_to_b14(repository)
 | 
	
		
			
				|  |  |      # first, try to load a pre-computed and centrally cached chunks index:
 | 
	
		
			
				|  |  |      if not disable_caches:
 | 
	
		
			
				|  |  | -        wanted_hash = load_chunks_hash(repository)
 | 
	
		
			
				|  |  | -        logger.debug(f"trying to get cached chunk index (id {bin_to_hex(wanted_hash)}) from the repo...")
 | 
	
		
			
				|  |  | -        try:
 | 
	
		
			
				|  |  | -            chunks_data = repository.store_load("cache/chunks")
 | 
	
		
			
				|  |  | -        except (Repository.ObjectNotFound, StoreObjectNotFound):
 | 
	
		
			
				|  |  | -            # TODO: ^ seem like RemoteRepository raises Repository.ONF instead of StoreONF
 | 
	
		
			
				|  |  | -            logger.debug("cache/chunks not found in the repository.")
 | 
	
		
			
				|  |  | -        else:
 | 
	
		
			
				|  |  | -            if xxh64(chunks_data, seed=CHUNKINDEX_HASH_SEED) == wanted_hash:
 | 
	
		
			
				|  |  | -                logger.debug("cache/chunks is valid.")
 | 
	
		
			
				|  |  | -                with io.BytesIO(chunks_data) as f:
 | 
	
		
			
				|  |  | -                    chunks = ChunkIndex.read(f)
 | 
	
		
			
				|  |  | -                return chunks
 | 
	
		
			
				|  |  | +        hashes = list_chunkindex_hashes(repository)
 | 
	
		
			
				|  |  | +        assert len(hashes) <= 1, f"chunk indexes: {hashes}"  # later we change to multiple chunkindexes...
 | 
	
		
			
				|  |  | +        for hash in hashes:
 | 
	
		
			
				|  |  | +            cache_name = f"cache/chunks.{hash}"
 | 
	
		
			
				|  |  | +            logger.debug(f"trying to load {cache_name} from the repo...")
 | 
	
		
			
				|  |  | +            try:
 | 
	
		
			
				|  |  | +                chunks_data = repository.store_load(cache_name)
 | 
	
		
			
				|  |  | +            except (Repository.ObjectNotFound, StoreObjectNotFound):
 | 
	
		
			
				|  |  | +                # TODO: ^ seem like RemoteRepository raises Repository.ONF instead of StoreONF
 | 
	
		
			
				|  |  | +                logger.debug(f"{cache_name} not found in the repository.")
 | 
	
		
			
				|  |  |              else:
 | 
	
		
			
				|  |  | -                logger.debug("cache/chunks is invalid.")
 | 
	
		
			
				|  |  | +                if xxh64(chunks_data, seed=CHUNKINDEX_HASH_SEED) == hex_to_bin(hash):
 | 
	
		
			
				|  |  | +                    logger.debug(f"{cache_name} is valid.")
 | 
	
		
			
				|  |  | +                    with io.BytesIO(chunks_data) as f:
 | 
	
		
			
				|  |  | +                        chunks = ChunkIndex.read(f)
 | 
	
		
			
				|  |  | +                    return chunks
 | 
	
		
			
				|  |  | +                else:
 | 
	
		
			
				|  |  | +                    logger.debug(f"{cache_name} is invalid.")
 | 
	
		
			
				|  |  |      # if we didn't get anything from the cache, compute the ChunkIndex the slow way:
 | 
	
		
			
				|  |  |      logger.debug("querying the chunk IDs list from the repo...")
 | 
	
		
			
				|  |  |      chunks = ChunkIndex()
 | 
	
	
		
			
				|  | @@ -741,7 +781,7 @@ def build_chunkindex_from_repo(repository, *, disable_caches=False, cache_immedi
 | 
	
		
			
				|  |  |      logger.debug(f"queried {num_chunks} chunk IDs in {duration} s, ~{speed}/s")
 | 
	
		
			
				|  |  |      if cache_immediately:
 | 
	
		
			
				|  |  |          # immediately update cache/chunks, so we only rarely have to do it the slow way:
 | 
	
		
			
				|  |  | -        write_chunkindex_to_repo_cache(repository, chunks, clear=False, force_write=True)
 | 
	
		
			
				|  |  | +        write_chunkindex_to_repo_cache(repository, chunks, clear=False, force_write=True, delete_other=True)
 | 
	
		
			
				|  |  |      return chunks
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -817,7 +857,7 @@ class ChunksMixin:
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      def _write_chunks_cache(self, chunks):
 | 
	
		
			
				|  |  |          # this is called from .close, so we can clear here:
 | 
	
		
			
				|  |  | -        write_chunkindex_to_repo_cache(self.repository, self._chunks, clear=True)
 | 
	
		
			
				|  |  | +        write_chunkindex_to_repo_cache(self.repository, self._chunks, clear=True, delete_other=True)
 | 
	
		
			
				|  |  |          self._chunks = None  # nothing there (cleared!)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      def refresh_lock(self, now):
 |