Bladeren bron

Add spot consistency check (#656).

Reviewed-on: https://projects.torsion.org/borgmatic-collective/borgmatic/pulls/849
Dan Helfman 1 jaar geleden
bovenliggende
commit
f9182514d8

+ 6 - 0
NEWS

@@ -1,4 +1,9 @@
 1.8.10.dev0
+ * #656 (beta): Add a "spot" consistency check that compares file counts and contents between your
+   source files and the latest archive, ensuring they fall within configured tolerances. This can
+   catch problems like incorrect excludes, inadvertent deletes, files changed by malware, etc. See
+   the documentation for more information:
+   https://torsion.org/borgmatic/docs/how-to/deal-with-very-large-backups/#spot-check
  * #842: When a command hook exits with a soft failure, ping the log and finish states for any
    configured monitoring hooks.
  * #843: Add documentation link to Loki dashboard for borgmatic:
@@ -8,6 +13,7 @@
  * Add documentation about backing up containerized databases by configuring borgmatic to exec into
    a container to run a dump command:
    https://torsion.org/borgmatic/docs/how-to/backup-your-databases/#containers
+ *
 
 1.8.9
  * #311: Add custom dump/restore command options for MySQL and MariaDB.

+ 614 - 2
borgmatic/actions/check.py

@@ -1,12 +1,569 @@
+import datetime
+import hashlib
+import itertools
 import logging
+import os
+import pathlib
+import random
 
 import borgmatic.borg.check
+import borgmatic.borg.create
+import borgmatic.borg.environment
+import borgmatic.borg.extract
+import borgmatic.borg.list
+import borgmatic.borg.rlist
+import borgmatic.borg.state
 import borgmatic.config.validate
+import borgmatic.execute
 import borgmatic.hooks.command
 
+DEFAULT_CHECKS = (
+    {'name': 'repository', 'frequency': '1 month'},
+    {'name': 'archives', 'frequency': '1 month'},
+)
+
+
 logger = logging.getLogger(__name__)
 
 
+def parse_checks(config, only_checks=None):
+    '''
+    Given a configuration dict with a "checks" sequence of dicts and an optional list of override
+    checks, return a tuple of named checks to run.
+
+    For example, given a config of:
+
+        {'checks': ({'name': 'repository'}, {'name': 'archives'})}
+
+    This will be returned as:
+
+        ('repository', 'archives')
+
+    If no "checks" option is present in the config, return the DEFAULT_CHECKS. If a checks value
+    has a name of "disabled", return an empty tuple, meaning that no checks should be run.
+    '''
+    checks = only_checks or tuple(
+        check_config['name'] for check_config in (config.get('checks', None) or DEFAULT_CHECKS)
+    )
+    checks = tuple(check.lower() for check in checks)
+
+    if 'disabled' in checks:
+        logger.warning(
+            'The "disabled" value for the "checks" option is deprecated and will be removed from a future release; use "skip_actions" instead'
+        )
+        if len(checks) > 1:
+            logger.warning(
+                'Multiple checks are configured, but one of them is "disabled"; not running any checks'
+            )
+        return ()
+
+    return checks
+
+
+def parse_frequency(frequency):
+    '''
+    Given a frequency string with a number and a unit of time, return a corresponding
+    datetime.timedelta instance or None if the frequency is None or "always".
+
+    For instance, given "3 weeks", return datetime.timedelta(weeks=3)
+
+    Raise ValueError if the given frequency cannot be parsed.
+    '''
+    if not frequency:
+        return None
+
+    frequency = frequency.strip().lower()
+
+    if frequency == 'always':
+        return None
+
+    try:
+        number, time_unit = frequency.split(' ')
+        number = int(number)
+    except ValueError:
+        raise ValueError(f"Could not parse consistency check frequency '{frequency}'")
+
+    if not time_unit.endswith('s'):
+        time_unit += 's'
+
+    if time_unit == 'months':
+        number *= 30
+        time_unit = 'days'
+    elif time_unit == 'years':
+        number *= 365
+        time_unit = 'days'
+
+    try:
+        return datetime.timedelta(**{time_unit: number})
+    except TypeError:
+        raise ValueError(f"Could not parse consistency check frequency '{frequency}'")
+
+
+def filter_checks_on_frequency(
+    config,
+    borg_repository_id,
+    checks,
+    force,
+    archives_check_id=None,
+):
+    '''
+    Given a configuration dict with a "checks" sequence of dicts, a Borg repository ID, a sequence
+    of checks, whether to force checks to run, and an ID for the archives check potentially being
+    run (if any), filter down those checks based on the configured "frequency" for each check as
+    compared to its check time file.
+
+    In other words, a check whose check time file's timestamp is too new (based on the configured
+    frequency) will get cut from the returned sequence of checks. Example:
+
+    config = {
+        'checks': [
+            {
+                'name': 'archives',
+                'frequency': '2 weeks',
+            },
+        ]
+    }
+
+    When this function is called with that config and "archives" in checks, "archives" will get
+    filtered out of the returned result if its check time file is newer than 2 weeks old, indicating
+    that it's not yet time to run that check again.
+
+    Raise ValueError if a frequency cannot be parsed.
+    '''
+    if not checks:
+        return checks
+
+    filtered_checks = list(checks)
+
+    if force:
+        return tuple(filtered_checks)
+
+    for check_config in config.get('checks', DEFAULT_CHECKS):
+        check = check_config['name']
+        if checks and check not in checks:
+            continue
+
+        frequency_delta = parse_frequency(check_config.get('frequency'))
+        if not frequency_delta:
+            continue
+
+        check_time = probe_for_check_time(config, borg_repository_id, check, archives_check_id)
+        if not check_time:
+            continue
+
+        # If we've not yet reached the time when the frequency dictates we're ready for another
+        # check, skip this check.
+        if datetime.datetime.now() < check_time + frequency_delta:
+            remaining = check_time + frequency_delta - datetime.datetime.now()
+            logger.info(
+                f'Skipping {check} check due to configured frequency; {remaining} until next check (use --force to check anyway)'
+            )
+            filtered_checks.remove(check)
+
+    return tuple(filtered_checks)
+
+
+def make_archives_check_id(archive_filter_flags):
+    '''
+    Given a sequence of flags to filter archives, return a unique hash corresponding to those
+    particular flags. If there are no flags, return None.
+    '''
+    if not archive_filter_flags:
+        return None
+
+    return hashlib.sha256(' '.join(archive_filter_flags).encode()).hexdigest()
+
+
+def make_check_time_path(config, borg_repository_id, check_type, archives_check_id=None):
+    '''
+    Given a configuration dict, a Borg repository ID, the name of a check type ("repository",
+    "archives", etc.), and a unique hash of the archives filter flags, return a path for recording
+    that check's time (the time of that check last occurring).
+    '''
+    borgmatic_source_directory = os.path.expanduser(
+        config.get(
+            'borgmatic_source_directory', borgmatic.borg.state.DEFAULT_BORGMATIC_SOURCE_DIRECTORY
+        )
+    )
+
+    if check_type in ('archives', 'data'):
+        return os.path.join(
+            borgmatic_source_directory,
+            'checks',
+            borg_repository_id,
+            check_type,
+            archives_check_id if archives_check_id else 'all',
+        )
+
+    return os.path.join(
+        borgmatic_source_directory,
+        'checks',
+        borg_repository_id,
+        check_type,
+    )
+
+
+def write_check_time(path):  # pragma: no cover
+    '''
+    Record a check time of now as the modification time of the given path.
+    '''
+    logger.debug(f'Writing check time at {path}')
+
+    os.makedirs(os.path.dirname(path), mode=0o700, exist_ok=True)
+    pathlib.Path(path, mode=0o600).touch()
+
+
+def read_check_time(path):
+    '''
+    Return the check time based on the modification time of the given path. Return None if the path
+    doesn't exist.
+    '''
+    logger.debug(f'Reading check time from {path}')
+
+    try:
+        return datetime.datetime.fromtimestamp(os.stat(path).st_mtime)
+    except FileNotFoundError:
+        return None
+
+
+def probe_for_check_time(config, borg_repository_id, check, archives_check_id):
+    '''
+    Given a configuration dict, a Borg repository ID, the name of a check type ("repository",
+    "archives", etc.), and a unique hash of the archives filter flags, return a the corresponding
+    check time or None if such a check time does not exist.
+
+    When the check type is "archives" or "data", this function probes two different paths to find
+    the check time, e.g.:
+
+      ~/.borgmatic/checks/1234567890/archives/9876543210
+      ~/.borgmatic/checks/1234567890/archives/all
+
+    ... and returns the maximum modification time of the files found (if any). The first path
+    represents a more specific archives check time (a check on a subset of archives), and the second
+    is a fallback to the last "all" archives check.
+
+    For other check types, this function reads from a single check time path, e.g.:
+
+      ~/.borgmatic/checks/1234567890/repository
+    '''
+    check_times = (
+        read_check_time(group[0])
+        for group in itertools.groupby(
+            (
+                make_check_time_path(config, borg_repository_id, check, archives_check_id),
+                make_check_time_path(config, borg_repository_id, check),
+            )
+        )
+    )
+
+    try:
+        return max(check_time for check_time in check_times if check_time)
+    except ValueError:
+        return None
+
+
+def upgrade_check_times(config, borg_repository_id):
+    '''
+    Given a configuration dict and a Borg repository ID, upgrade any corresponding check times on
+    disk from old-style paths to new-style paths.
+
+    Currently, the only upgrade performed is renaming an archive or data check path that looks like:
+
+      ~/.borgmatic/checks/1234567890/archives
+
+    to:
+
+      ~/.borgmatic/checks/1234567890/archives/all
+    '''
+    for check_type in ('archives', 'data'):
+        new_path = make_check_time_path(config, borg_repository_id, check_type, 'all')
+        old_path = os.path.dirname(new_path)
+        temporary_path = f'{old_path}.temp'
+
+        if not os.path.isfile(old_path) and not os.path.isfile(temporary_path):
+            continue
+
+        logger.debug(f'Upgrading archives check time from {old_path} to {new_path}')
+
+        try:
+            os.rename(old_path, temporary_path)
+        except FileNotFoundError:
+            pass
+
+        os.mkdir(old_path)
+        os.rename(temporary_path, new_path)
+
+
+def collect_spot_check_source_paths(
+    repository, config, local_borg_version, global_arguments, local_path, remote_path
+):
+    '''
+    Given a repository configuration dict, a configuration dict, the local Borg version, global
+    arguments as an argparse.Namespace instance, the local Borg path, and the remote Borg path,
+    collect the source paths that Borg would use in an actual create (but only include files and
+    symlinks).
+    '''
+    stream_processes = any(
+        borgmatic.hooks.dispatch.call_hooks(
+            'use_streaming',
+            config,
+            repository['path'],
+            borgmatic.hooks.dump.DATA_SOURCE_HOOK_NAMES,
+        ).values()
+    )
+
+    (create_flags, create_positional_arguments, pattern_file, exclude_file) = (
+        borgmatic.borg.create.make_base_create_command(
+            dry_run=True,
+            repository_path=repository['path'],
+            config=config,
+            config_paths=(),
+            local_borg_version=local_borg_version,
+            global_arguments=global_arguments,
+            borgmatic_source_directories=(),
+            local_path=local_path,
+            remote_path=remote_path,
+            list_files=True,
+            stream_processes=stream_processes,
+        )
+    )
+    borg_environment = borgmatic.borg.environment.make_environment(config)
+
+    try:
+        working_directory = os.path.expanduser(config.get('working_directory'))
+    except TypeError:
+        working_directory = None
+
+    paths_output = borgmatic.execute.execute_command_and_capture_output(
+        create_flags + create_positional_arguments,
+        capture_stderr=True,
+        working_directory=working_directory,
+        extra_environment=borg_environment,
+        borg_local_path=local_path,
+        borg_exit_codes=config.get('borg_exit_codes'),
+    )
+
+    paths = tuple(
+        path_line.split(' ', 1)[1]
+        for path_line in paths_output.split('\n')
+        if path_line and path_line.startswith('- ') or path_line.startswith('+ ')
+    )
+
+    return tuple(path for path in paths if os.path.isfile(path) or os.path.islink(path))
+
+
+BORG_DIRECTORY_FILE_TYPE = 'd'
+
+
+def collect_spot_check_archive_paths(
+    repository, archive, config, local_borg_version, global_arguments, local_path, remote_path
+):
+    '''
+    Given a repository configuration dict, the name of the latest archive, a configuration dict, the
+    local Borg version, global arguments as an argparse.Namespace instance, the local Borg path, and
+    the remote Borg path, collect the paths from the given archive (but only include files and
+    symlinks).
+    '''
+    borgmatic_source_directory = os.path.expanduser(
+        config.get(
+            'borgmatic_source_directory', borgmatic.borg.state.DEFAULT_BORGMATIC_SOURCE_DIRECTORY
+        )
+    )
+
+    return tuple(
+        path
+        for line in borgmatic.borg.list.capture_archive_listing(
+            repository['path'],
+            archive,
+            config,
+            local_borg_version,
+            global_arguments,
+            path_format='{type} /{path}{NL}',  # noqa: FS003
+            local_path=local_path,
+            remote_path=remote_path,
+        )
+        for (file_type, path) in (line.split(' ', 1),)
+        if file_type != BORG_DIRECTORY_FILE_TYPE
+        if pathlib.Path(borgmatic_source_directory) not in pathlib.Path(path).parents
+    )
+
+
+def compare_spot_check_hashes(
+    repository,
+    archive,
+    config,
+    local_borg_version,
+    global_arguments,
+    local_path,
+    remote_path,
+    log_label,
+    source_paths,
+):
+    '''
+    Given a repository configuration dict, the name of the latest archive, a configuration dict, the
+    local Borg version, global arguments as an argparse.Namespace instance, the local Borg path, the
+    remote Borg path, a log label, and spot check source paths, compare the hashes for a sampling of
+    the source paths with hashes from corresponding paths in the given archive. Return a sequence of
+    the paths that fail that hash comparison.
+    '''
+    # Based on the configured sample percentage, come up with a list of random sample files from the
+    # source directories.
+    spot_check_config = next(check for check in config['checks'] if check['name'] == 'spot')
+    sample_count = max(
+        int(len(source_paths) * (min(spot_check_config['data_sample_percentage'], 100) / 100)), 1
+    )
+    source_sample_paths = tuple(random.sample(source_paths, sample_count))
+    existing_source_sample_paths = {
+        source_path for source_path in source_sample_paths if os.path.exists(source_path)
+    }
+    logger.debug(
+        f'{log_label}: Sampling {sample_count} source paths (~{spot_check_config["data_sample_percentage"]}%) for spot check'
+    )
+
+    # Hash each file in the sample paths (if it exists).
+    hash_output = borgmatic.execute.execute_command_and_capture_output(
+        (spot_check_config.get('xxh64sum_command', 'xxh64sum'),)
+        + tuple(path for path in source_sample_paths if path in existing_source_sample_paths)
+    )
+
+    source_hashes = dict(
+        (reversed(line.split('  ', 1)) for line in hash_output.splitlines()),
+        **{path: '' for path in source_sample_paths if path not in existing_source_sample_paths},
+    )
+
+    archive_hashes = dict(
+        reversed(line.split(' ', 1))
+        for line in borgmatic.borg.list.capture_archive_listing(
+            repository['path'],
+            archive,
+            config,
+            local_borg_version,
+            global_arguments,
+            list_paths=source_sample_paths,
+            path_format='{xxh64} /{path}{NL}',  # noqa: FS003
+            local_path=local_path,
+            remote_path=remote_path,
+        )
+        if line
+    )
+
+    # Compare the source hashes with the archive hashes to see how many match.
+    failing_paths = []
+
+    for path, source_hash in source_hashes.items():
+        archive_hash = archive_hashes.get(path)
+
+        if archive_hash is not None and archive_hash == source_hash:
+            continue
+
+        failing_paths.append(path)
+
+    return tuple(failing_paths)
+
+
+def spot_check(
+    repository,
+    config,
+    local_borg_version,
+    global_arguments,
+    local_path,
+    remote_path,
+):
+    '''
+    Given a repository dict, a loaded configuration dict, the local Borg version, global arguments
+    as an argparse.Namespace instance, the local Borg path, and the remote Borg path, perform a spot
+    check for the latest archive in the given repository.
+
+    A spot check compares file counts and also the hashes for a random sampling of source files on
+    disk to those stored in the latest archive. If any differences are beyond configured tolerances,
+    then the check fails.
+    '''
+    log_label = f'{repository.get("label", repository["path"])}'
+    logger.debug(f'{log_label}: Running spot check')
+    spot_check_config = next(check for check in config['checks'] if check['name'] == 'spot')
+
+    if spot_check_config['data_tolerance_percentage'] > spot_check_config['data_sample_percentage']:
+        raise ValueError(
+            'The data_tolerance_percentage must be less than or equal to the data_sample_percentage'
+        )
+
+    source_paths = collect_spot_check_source_paths(
+        repository,
+        config,
+        local_borg_version,
+        global_arguments,
+        local_path,
+        remote_path,
+    )
+    logger.debug(f'{log_label}: {len(source_paths)} total source paths for spot check')
+
+    archive = borgmatic.borg.rlist.resolve_archive_name(
+        repository['path'],
+        'latest',
+        config,
+        local_borg_version,
+        global_arguments,
+        local_path,
+        remote_path,
+    )
+    logger.debug(f'{log_label}: Using archive {archive} for spot check')
+
+    archive_paths = collect_spot_check_archive_paths(
+        repository,
+        archive,
+        config,
+        local_borg_version,
+        global_arguments,
+        local_path,
+        remote_path,
+    )
+    logger.debug(f'{log_label}: {len(archive_paths)} total archive paths for spot check')
+
+    # Calculate the percentage delta between the source paths count and the archive paths count, and
+    # compare that delta to the configured count tolerance percentage.
+    count_delta_percentage = abs(len(source_paths) - len(archive_paths)) / len(source_paths) * 100
+
+    if count_delta_percentage > spot_check_config['count_tolerance_percentage']:
+        logger.debug(
+            f'{log_label}: Paths in source paths but not latest archive: {", ".join(set(source_paths) - set(archive_paths)) or "none"}'
+        )
+        logger.debug(
+            f'{log_label}: Paths in latest archive but not source paths: {", ".join(set(archive_paths) - set(source_paths)) or "none"}'
+        )
+        raise ValueError(
+            f'Spot check failed: {count_delta_percentage:.2f}% file count delta between source paths and latest archive (tolerance is {spot_check_config["count_tolerance_percentage"]}%)'
+        )
+
+    failing_paths = compare_spot_check_hashes(
+        repository,
+        archive,
+        config,
+        local_borg_version,
+        global_arguments,
+        local_path,
+        remote_path,
+        log_label,
+        source_paths,
+    )
+
+    # Error if the percentage of failing hashes exceeds the configured tolerance percentage.
+    logger.debug(f'{log_label}: {len(failing_paths)} non-matching spot check hashes')
+    data_tolerance_percentage = spot_check_config['data_tolerance_percentage']
+    failing_percentage = (len(failing_paths) / len(source_paths)) * 100
+
+    if failing_percentage > data_tolerance_percentage:
+        logger.debug(
+            f'{log_label}: Source paths with data not matching the latest archive: {", ".join(failing_paths)}'
+        )
+        raise ValueError(
+            f'Spot check failed: {failing_percentage:.2f}% of source paths with data not matching the latest archive (tolerance is {data_tolerance_percentage}%)'
+        )
+
+    logger.info(
+        f'{log_label}: Spot check passed with a {count_delta_percentage:.2f}% file count delta and a {failing_percentage:.2f}% file data delta'
+    )
+
+
 def run_check(
     config_filename,
     repository,
@@ -20,6 +577,8 @@ def run_check(
 ):
     '''
     Run the "check" action for the given repository.
+
+    Raise ValueError if the Borg repository ID cannot be determined.
     '''
     if check_arguments.repository and not borgmatic.config.validate.repositories_match(
         repository, check_arguments.repository
@@ -34,16 +593,69 @@ def run_check(
         global_arguments.dry_run,
         **hook_context,
     )
+
     logger.info(f'{repository.get("label", repository["path"])}: Running consistency checks')
-    borgmatic.borg.check.check_archives(
+    repository_id = borgmatic.borg.check.get_repository_id(
         repository['path'],
         config,
         local_borg_version,
-        check_arguments,
         global_arguments,
         local_path=local_path,
         remote_path=remote_path,
     )
+    upgrade_check_times(config, repository_id)
+    configured_checks = parse_checks(config, check_arguments.only_checks)
+    archive_filter_flags = borgmatic.borg.check.make_archive_filter_flags(
+        local_borg_version, config, configured_checks, check_arguments
+    )
+    archives_check_id = make_archives_check_id(archive_filter_flags)
+    checks = filter_checks_on_frequency(
+        config,
+        repository_id,
+        configured_checks,
+        check_arguments.force,
+        archives_check_id,
+    )
+    borg_specific_checks = set(checks).intersection({'repository', 'archives', 'data'})
+
+    if borg_specific_checks:
+        borgmatic.borg.check.check_archives(
+            repository['path'],
+            config,
+            local_borg_version,
+            check_arguments,
+            global_arguments,
+            borg_specific_checks,
+            archive_filter_flags,
+            local_path=local_path,
+            remote_path=remote_path,
+        )
+        for check in borg_specific_checks:
+            write_check_time(make_check_time_path(config, repository_id, check, archives_check_id))
+
+    if 'extract' in checks:
+        borgmatic.borg.extract.extract_last_archive_dry_run(
+            config,
+            local_borg_version,
+            global_arguments,
+            repository['path'],
+            config.get('lock_wait'),
+            local_path,
+            remote_path,
+        )
+        write_check_time(make_check_time_path(config, repository_id, 'extract'))
+
+    if 'spot' in checks:
+        spot_check(
+            repository,
+            config,
+            local_borg_version,
+            global_arguments,
+            local_path,
+            remote_path,
+        )
+        write_check_time(make_check_time_path(config, repository_id, 'spot'))
+
     borgmatic.hooks.command.execute_hook(
         config.get('after_check'),
         config.get('umask'),

+ 1 - 2
borgmatic/actions/json.py

@@ -1,6 +1,5 @@
-import logging
 import json
-
+import logging
 
 logger = logging.getLogger(__name__)
 

+ 69 - 372
borgmatic/borg/check.py

@@ -1,172 +1,26 @@
 import argparse
-import datetime
-import hashlib
-import itertools
 import json
 import logging
-import os
-import pathlib
 
-from borgmatic.borg import environment, extract, feature, flags, rinfo, state
+from borgmatic.borg import environment, feature, flags, rinfo
 from borgmatic.execute import DO_NOT_CAPTURE, execute_command
 
-DEFAULT_CHECKS = (
-    {'name': 'repository', 'frequency': '1 month'},
-    {'name': 'archives', 'frequency': '1 month'},
-)
-
-
 logger = logging.getLogger(__name__)
 
 
-def parse_checks(config, only_checks=None):
-    '''
-    Given a configuration dict with a "checks" sequence of dicts and an optional list of override
-    checks, return a tuple of named checks to run.
-
-    For example, given a config of:
-
-        {'checks': ({'name': 'repository'}, {'name': 'archives'})}
-
-    This will be returned as:
-
-        ('repository', 'archives')
-
-    If no "checks" option is present in the config, return the DEFAULT_CHECKS. If a checks value
-    has a name of "disabled", return an empty tuple, meaning that no checks should be run.
-    '''
-    checks = only_checks or tuple(
-        check_config['name'] for check_config in (config.get('checks', None) or DEFAULT_CHECKS)
-    )
-    checks = tuple(check.lower() for check in checks)
-
-    if 'disabled' in checks:
-        logger.warning(
-            'The "disabled" value for the "checks" option is deprecated and will be removed from a future release; use "skip_actions" instead'
-        )
-        if len(checks) > 1:
-            logger.warning(
-                'Multiple checks are configured, but one of them is "disabled"; not running any checks'
-            )
-        return ()
-
-    return checks
-
-
-def parse_frequency(frequency):
+def make_archive_filter_flags(local_borg_version, config, checks, check_arguments):
     '''
-    Given a frequency string with a number and a unit of time, return a corresponding
-    datetime.timedelta instance or None if the frequency is None or "always".
+    Given the local Borg version, a configuration dict, a parsed sequence of checks, and check
+    arguments as an argparse.Namespace instance, transform the checks into tuple of command-line
+    flags for filtering archives in a check command.
 
-    For instance, given "3 weeks", return datetime.timedelta(weeks=3)
-
-    Raise ValueError if the given frequency cannot be parsed.
-    '''
-    if not frequency:
-        return None
-
-    frequency = frequency.strip().lower()
-
-    if frequency == 'always':
-        return None
-
-    try:
-        number, time_unit = frequency.split(' ')
-        number = int(number)
-    except ValueError:
-        raise ValueError(f"Could not parse consistency check frequency '{frequency}'")
-
-    if not time_unit.endswith('s'):
-        time_unit += 's'
-
-    if time_unit == 'months':
-        number *= 30
-        time_unit = 'days'
-    elif time_unit == 'years':
-        number *= 365
-        time_unit = 'days'
-
-    try:
-        return datetime.timedelta(**{time_unit: number})
-    except TypeError:
-        raise ValueError(f"Could not parse consistency check frequency '{frequency}'")
-
-
-def filter_checks_on_frequency(
-    config,
-    borg_repository_id,
-    checks,
-    force,
-    archives_check_id=None,
-):
+    If "check_last" is set in the configuration and "archives" is in checks, then include a "--last"
+    flag. And if "prefix" is set in configuration and "archives" is in checks, then include a
+    "--match-archives" flag.
     '''
-    Given a configuration dict with a "checks" sequence of dicts, a Borg repository ID, a sequence
-    of checks, whether to force checks to run, and an ID for the archives check potentially being
-    run (if any), filter down those checks based on the configured "frequency" for each check as
-    compared to its check time file.
-
-    In other words, a check whose check time file's timestamp is too new (based on the configured
-    frequency) will get cut from the returned sequence of checks. Example:
-
-    config = {
-        'checks': [
-            {
-                'name': 'archives',
-                'frequency': '2 weeks',
-            },
-        ]
-    }
-
-    When this function is called with that config and "archives" in checks, "archives" will get
-    filtered out of the returned result if its check time file is newer than 2 weeks old, indicating
-    that it's not yet time to run that check again.
-
-    Raise ValueError if a frequency cannot be parsed.
-    '''
-    if not checks:
-        return checks
-
-    filtered_checks = list(checks)
-
-    if force:
-        return tuple(filtered_checks)
-
-    for check_config in config.get('checks', DEFAULT_CHECKS):
-        check = check_config['name']
-        if checks and check not in checks:
-            continue
-
-        frequency_delta = parse_frequency(check_config.get('frequency'))
-        if not frequency_delta:
-            continue
-
-        check_time = probe_for_check_time(config, borg_repository_id, check, archives_check_id)
-        if not check_time:
-            continue
-
-        # If we've not yet reached the time when the frequency dictates we're ready for another
-        # check, skip this check.
-        if datetime.datetime.now() < check_time + frequency_delta:
-            remaining = check_time + frequency_delta - datetime.datetime.now()
-            logger.info(
-                f'Skipping {check} check due to configured frequency; {remaining} until next check (use --force to check anyway)'
-            )
-            filtered_checks.remove(check)
-
-    return tuple(filtered_checks)
-
-
-def make_archive_filter_flags(
-    local_borg_version, config, checks, check_arguments, check_last=None, prefix=None
-):
-    '''
-    Given the local Borg version, a configuration dict, a parsed sequence of checks, check arguments
-    as an argparse.Namespace instance, the check last value, and a consistency check prefix,
-    transform the checks into tuple of command-line flags for filtering archives in a check command.
+    check_last = config.get('check_last', None)
+    prefix = config.get('prefix')
 
-    If a check_last value is given and "archives" is in checks, then include a "--last" flag. And if
-    a prefix value is given and "archives" is in checks, then include a "--match-archives" flag.
-    '''
     if 'archives' in checks or 'data' in checks:
         return (('--last', str(check_last)) if check_last else ()) + (
             (
@@ -196,17 +50,6 @@ def make_archive_filter_flags(
     return ()
 
 
-def make_archives_check_id(archive_filter_flags):
-    '''
-    Given a sequence of flags to filter archives, return a unique hash corresponding to those
-    particular flags. If there are no flags, return None.
-    '''
-    if not archive_filter_flags:
-        return None
-
-    return hashlib.sha256(' '.join(archive_filter_flags).encode()).hexdigest()
-
-
 def make_check_flags(checks, archive_filter_flags):
     '''
     Given a parsed sequence of checks and a sequence of flags to filter archives, transform the
@@ -240,144 +83,17 @@ def make_check_flags(checks, archive_filter_flags):
     )
 
 
-def make_check_time_path(config, borg_repository_id, check_type, archives_check_id=None):
-    '''
-    Given a configuration dict, a Borg repository ID, the name of a check type ("repository",
-    "archives", etc.), and a unique hash of the archives filter flags, return a path for recording
-    that check's time (the time of that check last occurring).
-    '''
-    borgmatic_source_directory = os.path.expanduser(
-        config.get('borgmatic_source_directory', state.DEFAULT_BORGMATIC_SOURCE_DIRECTORY)
-    )
-
-    if check_type in ('archives', 'data'):
-        return os.path.join(
-            borgmatic_source_directory,
-            'checks',
-            borg_repository_id,
-            check_type,
-            archives_check_id if archives_check_id else 'all',
-        )
-
-    return os.path.join(
-        borgmatic_source_directory,
-        'checks',
-        borg_repository_id,
-        check_type,
-    )
-
-
-def write_check_time(path):  # pragma: no cover
-    '''
-    Record a check time of now as the modification time of the given path.
-    '''
-    logger.debug(f'Writing check time at {path}')
-
-    os.makedirs(os.path.dirname(path), mode=0o700, exist_ok=True)
-    pathlib.Path(path, mode=0o600).touch()
-
-
-def read_check_time(path):
-    '''
-    Return the check time based on the modification time of the given path. Return None if the path
-    doesn't exist.
-    '''
-    logger.debug(f'Reading check time from {path}')
-
-    try:
-        return datetime.datetime.fromtimestamp(os.stat(path).st_mtime)
-    except FileNotFoundError:
-        return None
-
-
-def probe_for_check_time(config, borg_repository_id, check, archives_check_id):
-    '''
-    Given a configuration dict, a Borg repository ID, the name of a check type ("repository",
-    "archives", etc.), and a unique hash of the archives filter flags, return a the corresponding
-    check time or None if such a check time does not exist.
-
-    When the check type is "archives" or "data", this function probes two different paths to find
-    the check time, e.g.:
-
-      ~/.borgmatic/checks/1234567890/archives/9876543210
-      ~/.borgmatic/checks/1234567890/archives/all
-
-    ... and returns the maximum modification time of the files found (if any). The first path
-    represents a more specific archives check time (a check on a subset of archives), and the second
-    is a fallback to the last "all" archives check.
-
-    For other check types, this function reads from a single check time path, e.g.:
-
-      ~/.borgmatic/checks/1234567890/repository
-    '''
-    check_times = (
-        read_check_time(group[0])
-        for group in itertools.groupby(
-            (
-                make_check_time_path(config, borg_repository_id, check, archives_check_id),
-                make_check_time_path(config, borg_repository_id, check),
-            )
-        )
-    )
-
-    try:
-        return max(check_time for check_time in check_times if check_time)
-    except ValueError:
-        return None
-
-
-def upgrade_check_times(config, borg_repository_id):
-    '''
-    Given a configuration dict and a Borg repository ID, upgrade any corresponding check times on
-    disk from old-style paths to new-style paths.
-
-    Currently, the only upgrade performed is renaming an archive or data check path that looks like:
-
-      ~/.borgmatic/checks/1234567890/archives
-
-    to:
-
-      ~/.borgmatic/checks/1234567890/archives/all
-    '''
-    for check_type in ('archives', 'data'):
-        new_path = make_check_time_path(config, borg_repository_id, check_type, 'all')
-        old_path = os.path.dirname(new_path)
-        temporary_path = f'{old_path}.temp'
-
-        if not os.path.isfile(old_path) and not os.path.isfile(temporary_path):
-            continue
-
-        logger.debug(f'Upgrading archives check time from {old_path} to {new_path}')
-
-        try:
-            os.rename(old_path, temporary_path)
-        except FileNotFoundError:
-            pass
-
-        os.mkdir(old_path)
-        os.rename(temporary_path, new_path)
-
-
-def check_archives(
-    repository_path,
-    config,
-    local_borg_version,
-    check_arguments,
-    global_arguments,
-    local_path='borg',
-    remote_path=None,
+def get_repository_id(
+    repository_path, config, local_borg_version, global_arguments, local_path, remote_path
 ):
     '''
-    Given a local or remote repository path, a configuration dict, the local Borg version, check
-    arguments as an argparse.Namespace instance, global arguments, and local/remote commands to run,
-    check the contained Borg archives for consistency.
-
-    If there are no consistency checks to run, skip running them.
+    Given a local or remote repository path, a configuration dict, the local Borg version, global
+    arguments, and local/remote commands to run, return the corresponding Borg repository ID.
 
-    Raises ValueError if the Borg repository ID cannot be determined.
+    Raise ValueError if the Borg repository ID cannot be determined.
     '''
     try:
-        borg_repository_id = json.loads(
+        return json.loads(
             rinfo.display_repository_info(
                 repository_path,
                 config,
@@ -391,82 +107,63 @@ def check_archives(
     except (json.JSONDecodeError, KeyError):
         raise ValueError(f'Cannot determine Borg repository ID for {repository_path}')
 
-    upgrade_check_times(config, borg_repository_id)
 
-    check_last = config.get('check_last', None)
-    prefix = config.get('prefix')
-    configured_checks = parse_checks(config, check_arguments.only_checks)
-    lock_wait = None
+def check_archives(
+    repository_path,
+    config,
+    local_borg_version,
+    check_arguments,
+    global_arguments,
+    checks,
+    archive_filter_flags,
+    local_path='borg',
+    remote_path=None,
+):
+    '''
+    Given a local or remote repository path, a configuration dict, the local Borg version, check
+    arguments as an argparse.Namespace instance, global arguments, a set of named Borg checks to run
+    (some combination "repository", "archives", and/or "data"), archive filter flags, and
+    local/remote commands to run, check the contained Borg archives for consistency.
+    '''
+    lock_wait = config.get('lock_wait')
     extra_borg_options = config.get('extra_borg_options', {}).get('check', '')
-    archive_filter_flags = make_archive_filter_flags(
-        local_borg_version, config, configured_checks, check_arguments, check_last, prefix
-    )
-    archives_check_id = make_archives_check_id(archive_filter_flags)
 
-    checks = filter_checks_on_frequency(
-        config,
-        borg_repository_id,
-        configured_checks,
-        check_arguments.force,
-        archives_check_id,
+    verbosity_flags = ()
+    if logger.isEnabledFor(logging.INFO):
+        verbosity_flags = ('--info',)
+    if logger.isEnabledFor(logging.DEBUG):
+        verbosity_flags = ('--debug', '--show-rc')
+
+    full_command = (
+        (local_path, 'check')
+        + (('--repair',) if check_arguments.repair else ())
+        + make_check_flags(checks, archive_filter_flags)
+        + (('--remote-path', remote_path) if remote_path else ())
+        + (('--log-json',) if global_arguments.log_json else ())
+        + (('--lock-wait', str(lock_wait)) if lock_wait else ())
+        + verbosity_flags
+        + (('--progress',) if check_arguments.progress else ())
+        + (tuple(extra_borg_options.split(' ')) if extra_borg_options else ())
+        + flags.make_repository_flags(repository_path, local_borg_version)
     )
 
-    if set(checks).intersection({'repository', 'archives', 'data'}):
-        lock_wait = config.get('lock_wait')
-
-        verbosity_flags = ()
-        if logger.isEnabledFor(logging.INFO):
-            verbosity_flags = ('--info',)
-        if logger.isEnabledFor(logging.DEBUG):
-            verbosity_flags = ('--debug', '--show-rc')
-
-        full_command = (
-            (local_path, 'check')
-            + (('--repair',) if check_arguments.repair else ())
-            + make_check_flags(checks, archive_filter_flags)
-            + (('--remote-path', remote_path) if remote_path else ())
-            + (('--log-json',) if global_arguments.log_json else ())
-            + (('--lock-wait', str(lock_wait)) if lock_wait else ())
-            + verbosity_flags
-            + (('--progress',) if check_arguments.progress else ())
-            + (tuple(extra_borg_options.split(' ')) if extra_borg_options else ())
-            + flags.make_repository_flags(repository_path, local_borg_version)
+    borg_environment = environment.make_environment(config)
+    borg_exit_codes = config.get('borg_exit_codes')
+
+    # The Borg repair option triggers an interactive prompt, which won't work when output is
+    # captured. And progress messes with the terminal directly.
+    if check_arguments.repair or check_arguments.progress:
+        execute_command(
+            full_command,
+            output_file=DO_NOT_CAPTURE,
+            extra_environment=borg_environment,
+            borg_local_path=local_path,
+            borg_exit_codes=borg_exit_codes,
         )
-
-        borg_environment = environment.make_environment(config)
-        borg_exit_codes = config.get('borg_exit_codes')
-
-        # The Borg repair option triggers an interactive prompt, which won't work when output is
-        # captured. And progress messes with the terminal directly.
-        if check_arguments.repair or check_arguments.progress:
-            execute_command(
-                full_command,
-                output_file=DO_NOT_CAPTURE,
-                extra_environment=borg_environment,
-                borg_local_path=local_path,
-                borg_exit_codes=borg_exit_codes,
-            )
-        else:
-            execute_command(
-                full_command,
-                extra_environment=borg_environment,
-                borg_local_path=local_path,
-                borg_exit_codes=borg_exit_codes,
-            )
-
-        for check in checks:
-            write_check_time(
-                make_check_time_path(config, borg_repository_id, check, archives_check_id)
-            )
-
-    if 'extract' in checks:
-        extract.extract_last_archive_dry_run(
-            config,
-            local_borg_version,
-            global_arguments,
-            repository_path,
-            lock_wait,
-            local_path,
-            remote_path,
+    else:
+        execute_command(
+            full_command,
+            extra_environment=borg_environment,
+            borg_local_path=local_path,
+            borg_exit_codes=borg_exit_codes,
         )
-        write_check_time(make_check_time_path(config, borg_repository_id, 'extract'))

+ 89 - 39
borgmatic/borg/create.py

@@ -275,11 +275,11 @@ def collect_special_file_paths(
     create_command, config, local_path, working_directory, borg_environment, skip_directories
 ):
     '''
-    Given a Borg create command as a tuple, a local Borg path, a working directory, a dict of
-    environment variables to pass to Borg, and a sequence of parent directories to skip, collect the
-    paths for any special files (character devices, block devices, and named pipes / FIFOs) that
-    Borg would encounter during a create. These are all paths that could cause Borg to hang if its
-    --read-special flag is used.
+    Given a Borg create command as a tuple, a configuration dict, a local Borg path, a working
+    directory, a dict of environment variables to pass to Borg, and a sequence of parent directories
+    to skip, collect the paths for any special files (character devices, block devices, and named
+    pipes / FIFOs) that Borg would encounter during a create. These are all paths that could cause
+    Borg to hang if its --read-special flag is used.
     '''
     # Omit "--exclude-nodump" from the Borg dry run command, because that flag causes Borg to open
     # files including any named pipe we've created.
@@ -320,35 +320,31 @@ def check_all_source_directories_exist(source_directories):
         raise ValueError(f"Source directories do not exist: {', '.join(missing_directories)}")
 
 
-def create_archive(
+def make_base_create_command(
     dry_run,
     repository_path,
     config,
     config_paths,
     local_borg_version,
     global_arguments,
+    borgmatic_source_directories,
     local_path='borg',
     remote_path=None,
     progress=False,
-    stats=False,
     json=False,
     list_files=False,
     stream_processes=None,
 ):
     '''
     Given vebosity/dry-run flags, a local or remote repository path, a configuration dict, a
-    sequence of loaded configuration paths, the local Borg version, and global arguments as an
-    argparse.Namespace instance, create a Borg archive and return Borg's JSON output (if any).
-
-    If a sequence of stream processes is given (instances of subprocess.Popen), then execute the
-    create command while also triggering the given processes to produce output.
+    sequence of loaded configuration paths, the local Borg version, global arguments as an
+    argparse.Namespace instance, and a sequence of borgmatic source directories, return a tuple of
+    (base Borg create command flags, Borg create command positional arguments, open pattern file
+    handle, open exclude file handle).
     '''
-    borgmatic.logger.add_custom_log_levels()
-    borgmatic_source_directories = expand_directories(
-        collect_borgmatic_source_directories(config.get('borgmatic_source_directory'))
-    )
     if config.get('source_directories_must_exist', False):
         check_all_source_directories_exist(config.get('source_directories'))
+
     sources = deduplicate_directories(
         map_directories_to_devices(
             expand_directories(
@@ -364,11 +360,6 @@ def create_archive(
 
     ensure_files_readable(config.get('patterns_from'), config.get('exclude_from'))
 
-    try:
-        working_directory = os.path.expanduser(config.get('working_directory'))
-    except TypeError:
-        working_directory = None
-
     pattern_file = (
         write_pattern_file(config.get('patterns'), sources)
         if config.get('patterns') or config.get('patterns_from')
@@ -411,11 +402,6 @@ def create_archive(
             ('--remote-ratelimit', str(upload_rate_limit)) if upload_rate_limit else ()
         )
 
-    if stream_processes and config.get('read_special') is False:
-        logger.warning(
-            f'{repository_path}: Ignoring configured "read_special" value of false, as true is needed for database hooks.'
-        )
-
     create_flags = (
         tuple(local_path.split(' '))
         + ('create',)
@@ -451,22 +437,19 @@ def create_archive(
         repository_path, archive_name_format, local_borg_version
     ) + (sources if not pattern_file else ())
 
-    if json:
-        output_log_level = None
-    elif list_files or (stats and not dry_run):
-        output_log_level = logging.ANSWER
-    else:
-        output_log_level = logging.INFO
-
-    # The progress output isn't compatible with captured and logged output, as progress messes with
-    # the terminal directly.
-    output_file = DO_NOT_CAPTURE if progress else None
-
-    borg_environment = environment.make_environment(config)
-
     # If database hooks are enabled (as indicated by streaming processes), exclude files that might
     # cause Borg to hang. But skip this if the user has explicitly set the "read_special" to True.
     if stream_processes and not config.get('read_special'):
+        logger.warning(
+            f'{repository_path}: Ignoring configured "read_special" value of false, as true is needed for database hooks.'
+        )
+        try:
+            working_directory = os.path.expanduser(config.get('working_directory'))
+        except TypeError:
+            working_directory = None
+
+        borg_environment = environment.make_environment(config)
+
         logger.debug(f'{repository_path}: Collecting special file paths')
         special_file_paths = collect_special_file_paths(
             create_flags + create_positional_arguments,
@@ -489,6 +472,73 @@ def create_archive(
             )
             create_flags += make_exclude_flags(config, exclude_file.name)
 
+    return (create_flags, create_positional_arguments, pattern_file, exclude_file)
+
+
+def create_archive(
+    dry_run,
+    repository_path,
+    config,
+    config_paths,
+    local_borg_version,
+    global_arguments,
+    local_path='borg',
+    remote_path=None,
+    progress=False,
+    stats=False,
+    json=False,
+    list_files=False,
+    stream_processes=None,
+):
+    '''
+    Given vebosity/dry-run flags, a local or remote repository path, a configuration dict, a
+    sequence of loaded configuration paths, the local Borg version, and global arguments as an
+    argparse.Namespace instance, create a Borg archive and return Borg's JSON output (if any).
+
+    If a sequence of stream processes is given (instances of subprocess.Popen), then execute the
+    create command while also triggering the given processes to produce output.
+    '''
+    borgmatic.logger.add_custom_log_levels()
+    borgmatic_source_directories = expand_directories(
+        collect_borgmatic_source_directories(config.get('borgmatic_source_directory'))
+    )
+
+    (create_flags, create_positional_arguments, pattern_file, exclude_file) = (
+        make_base_create_command(
+            dry_run,
+            repository_path,
+            config,
+            config_paths,
+            local_borg_version,
+            global_arguments,
+            borgmatic_source_directories,
+            local_path,
+            remote_path,
+            progress,
+            json,
+            list_files,
+            stream_processes,
+        )
+    )
+
+    if json:
+        output_log_level = None
+    elif list_files or (stats and not dry_run):
+        output_log_level = logging.ANSWER
+    else:
+        output_log_level = logging.INFO
+
+    # The progress output isn't compatible with captured and logged output, as progress messes with
+    # the terminal directly.
+    output_file = DO_NOT_CAPTURE if progress else None
+
+    try:
+        working_directory = os.path.expanduser(config.get('working_directory'))
+    except TypeError:
+        working_directory = None
+
+    borg_environment = environment.make_environment(config)
+
     create_flags += (
         (('--info',) if logger.getEffectiveLevel() == logging.INFO and not json else ())
         + (('--stats',) if stats and not json and not dry_run else ())

+ 5 - 4
borgmatic/borg/list.py

@@ -95,14 +95,15 @@ def capture_archive_listing(
     local_borg_version,
     global_arguments,
     list_paths=None,
+    path_format=None,
     local_path='borg',
     remote_path=None,
 ):
     '''
     Given a local or remote repository path, an archive name, a configuration dict, the local Borg
-    version, global arguments as an argparse.Namespace, the archive paths in which to list files, and
-    local and remote Borg paths, capture the output of listing that archive and return it as a list
-    of file paths.
+    version, global arguments as an argparse.Namespace, the archive paths in which to list files,
+    the Borg path format to use for the output, and local and remote Borg paths, capture the output
+    of listing that archive and return it as a list of file paths.
     '''
     borg_environment = environment.make_environment(config)
 
@@ -118,7 +119,7 @@ def capture_archive_listing(
                     paths=[f'sh:{path}' for path in list_paths] if list_paths else None,
                     find_paths=None,
                     json=None,
-                    format='{path}{NL}',  # noqa: FS003
+                    format=path_format or '{path}{NL}',  # noqa: FS003
                 ),
                 global_arguments,
                 local_path,

+ 2 - 2
borgmatic/commands/arguments.py

@@ -614,10 +614,10 @@ def make_parsers():
     check_group.add_argument(
         '--only',
         metavar='CHECK',
-        choices=('repository', 'archives', 'data', 'extract'),
+        choices=('repository', 'archives', 'data', 'extract', 'spot'),
         dest='only_checks',
         action='append',
-        help='Run a particular consistency check (repository, archives, data, or extract) instead of configured checks (subject to configured frequency, can specify flag multiple times)',
+        help='Run a particular consistency check (repository, archives, data, extract, or spot) instead of configured checks (subject to configured frequency, can specify flag multiple times)',
     )
     check_group.add_argument(
         '--force',

+ 16 - 3
borgmatic/config/generate.py

@@ -21,6 +21,19 @@ def insert_newline_before_comment(config, field_name):
     )
 
 
+def get_properties(schema):
+    '''
+    Given a schema dict, return its properties. But if it's got sub-schemas with multiple different
+    potential properties, returned their merged properties instead.
+    '''
+    if 'oneOf' in schema:
+        return dict(
+            collections.ChainMap(*[sub_schema['properties'] for sub_schema in schema['oneOf']])
+        )
+
+    return schema['properties']
+
+
 def schema_to_sample_configuration(schema, level=0, parent_is_sequence=False):
     '''
     Given a loaded configuration schema, generate and return sample config for it. Include comments
@@ -40,7 +53,7 @@ def schema_to_sample_configuration(schema, level=0, parent_is_sequence=False):
         config = ruamel.yaml.comments.CommentedMap(
             [
                 (field_name, schema_to_sample_configuration(sub_schema, level + 1))
-                for field_name, sub_schema in schema['properties'].items()
+                for field_name, sub_schema in get_properties(schema).items()
             ]
         )
         indent = (level * INDENT) + (SEQUENCE_INDENT if parent_is_sequence else 0)
@@ -151,7 +164,7 @@ def add_comments_to_configuration_sequence(config, schema, indent=0):
         return
 
     for field_name in config[0].keys():
-        field_schema = schema['items']['properties'].get(field_name, {})
+        field_schema = get_properties(schema['items']).get(field_name, {})
         description = field_schema.get('description')
 
         # No description to use? Skip it.
@@ -178,7 +191,7 @@ def add_comments_to_configuration_object(config, schema, indent=0, skip_first=Fa
         if skip_first and index == 0:
             continue
 
-        field_schema = schema['properties'].get(field_name, {})
+        field_schema = get_properties(schema).get(field_name, {})
         description = field_schema.get('description', '').strip()
 
         # If this is an optional key, add an indicator to the comment flagging it to be commented

+ 114 - 31
borgmatic/config/schema.yaml

@@ -503,37 +503,120 @@ properties:
         type: array
         items:
             type: object
-            required: ['name']
-            additionalProperties: false
-            properties:
-                name:
-                    type: string
-                    enum:
-                        - repository
-                        - archives
-                        - data
-                        - extract
-                        - disabled
-                    description: |
-                        Name of consistency check to run: "repository",
-                        "archives", "data", and/or "extract". "repository"
-                        checks the consistency of the repository, "archives"
-                        checks all of the archives, "data" verifies the
-                        integrity of the data within the archives, and "extract"
-                        does an extraction dry-run of the most recent archive.
-                        Note that "data" implies "archives". See "skip_actions"
-                        for disabling checks altogether.
-                    example: repository
-                frequency:
-                    type: string
-                    description: |
-                        How frequently to run this type of consistency check (as
-                        a best effort). The value is a number followed by a unit
-                        of time. E.g., "2 weeks" to run this consistency check
-                        no more than every two weeks for a given repository or
-                        "1 month" to run it no more than monthly. Defaults to
-                        "always": running this check every time checks are run.
-                    example: 2 weeks
+            oneOf:
+                - required: [name]
+                  additionalProperties: false
+                  properties:
+                      name:
+                          type: string
+                          enum:
+                              - repository
+                              - archives
+                              - data
+                              - extract
+                              - disabled
+                          description: |
+                              Name of consistency check to run: "repository",
+                              "archives", "data", "spot", and/or "extract".
+                              "repository" checks the consistency of the
+                              repository, "archives" checks all of the
+                              archives, "data" verifies the integrity of the
+                              data within the archives, "spot" checks that
+                              some percentage of source files are found in the
+                              most recent archive (with identical contents),
+                              and "extract" does an extraction dry-run of the
+                              most recent archive. Note that "data" implies
+                              "archives". See "skip_actions" for disabling
+                              checks altogether.
+                          example: spot
+                      frequency:
+                          type: string
+                          description: |
+                              How frequently to run this type of consistency
+                              check (as a best effort). The value is a number
+                              followed by a unit of time. E.g., "2 weeks" to
+                              run this consistency check no more than every
+                              two weeks for a given repository or "1 month" to
+                              run it no more than monthly. Defaults to
+                              "always": running this check every time checks
+                              are run.
+                          example: 2 weeks
+                - required:
+                    - name
+                    - count_tolerance_percentage
+                    - data_sample_percentage
+                    - data_tolerance_percentage
+                  additionalProperties: false
+                  properties:
+                      name:
+                          type: string
+                          enum:
+                              - spot
+                          description: |
+                              Name of consistency check to run: "repository",
+                              "archives", "data", "spot", and/or "extract".
+                              "repository" checks the consistency of the
+                              repository, "archives" checks all of the
+                              archives, "data" verifies the integrity of the
+                              data within the archives, "spot" checks that
+                              some percentage of source files are found in the
+                              most recent archive (with identical contents),
+                              and "extract" does an extraction dry-run of the
+                              most recent archive. Note that "data" implies
+                              "archives". See "skip_actions" for disabling
+                              checks altogether.
+                          example: repository
+                      frequency:
+                          type: string
+                          description: |
+                              How frequently to run this type of consistency
+                              check (as a best effort). The value is a number
+                              followed by a unit of time. E.g., "2 weeks" to
+                              run this consistency check no more than every
+                              two weeks for a given repository or "1 month" to
+                              run it no more than monthly. Defaults to
+                              "always": running this check every time checks
+                              are run.
+                          example: 2 weeks
+                      count_tolerance_percentage:
+                          type: number
+                          description: |
+                              The percentage delta between the source
+                              directories file count and the most recent backup
+                              archive file count that is allowed before the
+                              entire consistency check fails. This can catch
+                              problems like incorrect excludes, inadvertent
+                              deletes, etc. Only applies to the "spot" check.
+                          example: 10
+                      data_sample_percentage:
+                          type: number
+                          description: |
+                              The percentage of total files in the source
+                              directories to randomly sample and compare to
+                              their corresponding files in the most recent
+                              backup archive. Only applies to the "spot" check.
+                          example: 1
+                      data_tolerance_percentage:
+                          type: number
+                          description: |
+                              The percentage of total files in the source
+                              directories that can fail a spot check comparison
+                              without failing the entire consistency check. This
+                              can catch problems like source files that have
+                              been bulk-changed by malware, backups that have
+                              been tampered with, etc. The value must be lower
+                              than or equal to the "contents_sample_percentage".
+                              Only applies to the "spot" check.
+                          example: 0.5
+                      xxh64sum_command:
+                          type: string
+                          description: |
+                              Command to use instead of "xxh64sum" to hash
+                              source files, usually found in an OS package named
+                              "xxhash". Do not substitute with a different hash
+                              type (SHA, MD5, etc.) or the check will never
+                              succeed. Only applies to the "spot" check.
+                          example: /usr/local/bin/xxh64sum
         description: |
             List of one or more consistency checks to run on a periodic basis
             (if "frequency" is set) or every time borgmatic runs checks (if

+ 11 - 1
borgmatic/execute.py

@@ -4,6 +4,7 @@ import logging
 import os
 import select
 import subprocess
+import textwrap
 
 logger = logging.getLogger(__name__)
 
@@ -219,13 +220,22 @@ def log_outputs(processes, exclude_stdouts, output_log_level, borg_local_path, b
         }
 
 
+MAX_LOGGED_COMMAND_LENGTH = 1000
+
+
 def log_command(full_command, input_file=None, output_file=None, environment=None):
     '''
     Log the given command (a sequence of command/argument strings), along with its input/output file
     paths and extra environment variables (with omitted values in case they contain passwords).
     '''
     logger.debug(
-        ' '.join(tuple(f'{key}=***' for key in (environment or {}).keys()) + tuple(full_command))
+        textwrap.shorten(
+            ' '.join(
+                tuple(f'{key}=***' for key in (environment or {}).keys()) + tuple(full_command)
+            ),
+            width=MAX_LOGGED_COMMAND_LENGTH,
+            placeholder=' ...',
+        )
         + (f" < {getattr(input_file, 'name', '')}" if input_file else '')
         + (f" > {getattr(output_file, 'name', '')}" if output_file else '')
     )

+ 8 - 0
borgmatic/hooks/mariadb.py

@@ -115,6 +115,14 @@ def execute_dump_command(
     )
 
 
+def use_streaming(databases, config, log_prefix):
+    '''
+    Given a sequence of MariaDB database configuration dicts, a configuration dict (ignored), and a
+    log prefix (ignored), return whether streaming will be using during dumps.
+    '''
+    return any(databases)
+
+
 def dump_data_sources(databases, config, log_prefix, dry_run):
     '''
     Dump the given MariaDB databases to a named pipe. The databases are supplied as a sequence of

+ 8 - 0
borgmatic/hooks/mongodb.py

@@ -16,6 +16,14 @@ def make_dump_path(config):  # pragma: no cover
     )
 
 
+def use_streaming(databases, config, log_prefix):
+    '''
+    Given a sequence of MongoDB database configuration dicts, a configuration dict (ignored), and a
+    log prefix (ignored), return whether streaming will be using during dumps.
+    '''
+    return any(database.get('format') != 'directory' for database in databases)
+
+
 def dump_data_sources(databases, config, log_prefix, dry_run):
     '''
     Dump the given MongoDB databases to a named pipe. The databases are supplied as a sequence of

+ 8 - 0
borgmatic/hooks/mysql.py

@@ -114,6 +114,14 @@ def execute_dump_command(
     )
 
 
+def use_streaming(databases, config, log_prefix):
+    '''
+    Given a sequence of MySQL database configuration dicts, a configuration dict (ignored), and a
+    log prefix (ignored), return whether streaming will be using during dumps.
+    '''
+    return any(databases)
+
+
 def dump_data_sources(databases, config, log_prefix, dry_run):
     '''
     Dump the given MySQL/MariaDB databases to a named pipe. The databases are supplied as a sequence

+ 8 - 0
borgmatic/hooks/postgresql.py

@@ -96,6 +96,14 @@ def database_names_to_dump(database, extra_environment, log_prefix, dry_run):
     )
 
 
+def use_streaming(databases, config, log_prefix):
+    '''
+    Given a sequence of PostgreSQL database configuration dicts, a configuration dict (ignored), and
+    a log prefix (ignored), return whether streaming will be using during dumps.
+    '''
+    return any(database.get('format') != 'directory' for database in databases)
+
+
 def dump_data_sources(databases, config, log_prefix, dry_run):
     '''
     Dump the given PostgreSQL databases to a named pipe. The databases are supplied as a sequence of

+ 12 - 4
borgmatic/hooks/sqlite.py

@@ -17,9 +17,17 @@ def make_dump_path(config):  # pragma: no cover
     )
 
 
+def use_streaming(databases, config, log_prefix):
+    '''
+    Given a sequence of SQLite database configuration dicts, a configuration dict (ignored), and a
+    log prefix (ignored), return whether streaming will be using during dumps.
+    '''
+    return any(databases)
+
+
 def dump_data_sources(databases, config, log_prefix, dry_run):
     '''
-    Dump the given SQLite3 databases to a named pipe. The databases are supplied as a sequence of
+    Dump the given SQLite databases to a named pipe. The databases are supplied as a sequence of
     configuration dicts, as per the configuration schema. Use the given configuration dict to
     construct the destination path and the given log prefix in any log entries.
 
@@ -71,7 +79,7 @@ def dump_data_sources(databases, config, log_prefix, dry_run):
 
 def remove_data_source_dumps(databases, config, log_prefix, dry_run):  # pragma: no cover
     '''
-    Remove the given SQLite3 database dumps from the filesystem. The databases are supplied as a
+    Remove the given SQLite database dumps from the filesystem. The databases are supplied as a
     sequence of configuration dicts, as per the configuration schema. Use the given configuration
     dict to construct the destination path and the given log prefix in any log entries. If this is a
     dry run, then don't actually remove anything.
@@ -81,8 +89,8 @@ def remove_data_source_dumps(databases, config, log_prefix, dry_run):  # pragma:
 
 def make_data_source_dump_pattern(databases, config, log_prefix, name=None):  # pragma: no cover
     '''
-    Make a pattern that matches the given SQLite3 databases. The databases are supplied as a
-    sequence of configuration dicts, as per the configuration schema.
+    Make a pattern that matches the given SQLite databases. The databases are supplied as a sequence
+    of configuration dicts, as per the configuration schema.
     '''
     return dump.make_data_source_dump_filename(make_dump_path(config), name)
 

+ 80 - 1
docs/how-to/deal-with-very-large-backups.md

@@ -91,8 +91,9 @@ Here are the available checks from fastest to slowest:
 
  * `repository`: Checks the consistency of the repository itself.
  * `archives`: Checks all of the archives in the repository.
- * `extract`: Performs an extraction dry-run of the most recent archive.
+ * `extract`: Performs an extraction dry-run of the latest archive.
  * `data`: Verifies the data integrity of all archives contents, decrypting and decompressing all data.
+ * `spot`: Compares file counts and contents between your source files and the latest archive.
 
 Note that the `data` check is a more thorough version of the `archives` check,
 so enabling the `data` check implicitly enables the `archives` check as well.
@@ -102,6 +103,84 @@ documentation](https://borgbackup.readthedocs.io/en/stable/usage/check.html)
 for more information.
 
 
+### Spot check
+
+The various consistency checks all have trade-offs around speed and
+thoroughness, but most of them don't even look at your original source
+files—arguably one important way to ensure your backups contain the files
+you'll want to restore in the case of catastrophe (or just an accidentally
+deleted file). Because if something goes wrong with your source files, most
+consistency checks will still pass with flying colors and you won't discover
+there's a problem until you go to restore.
+
+<span class="minilink minilink-addedin">New in version 1.8.10</span> <span
+class="minilink minilink-addedin">Beta feature</span> That's where the spot
+check comes in. This check actually compares your source file counts and data
+against those in the latest archive, potentially catching problems like
+incorrect excludes, inadvertent deletes, files changed by malware, etc.
+
+However, because an exhaustive comparison of all source files against the
+latest archive might be too slow, the spot check supports *sampling* a
+percentage of your source files for the comparison, ensuring it falls within
+configured tolerances.
+
+Here's how it works. Start by installing the `xxhash` OS package if you don't
+already have it, so the spot check can run the `xxh64sum` command and
+efficiently hash files for comparison. Then add something like the following
+to your borgmatic configuration:
+
+```yaml
+checks:
+    - name: spot
+      count_tolerance_percentage: 10
+      data_sample_percentage: 1
+      data_tolerance_percentage: 0.5
+```
+
+The `count_tolerance_percentage` is the percentage delta between the source
+directories file count and the latest backup archive file count that is
+allowed before the entire consistency check fails. For instance, if the spot
+check runs and finds 100 source files on disk and 105 files in the latest
+archive, that would be within the configured 10% count tolerance and the check
+would succeed. But if there were 100 source files and 200 archive files, the
+check would fail. (100 source files and only 50 archive files would also
+fail.)
+
+The `data_sample_percentage` is the percentage of total files in the source
+directories to randomly sample and compare to their corresponding files in the
+latest backup archive. A higher value allows a more accurate check—and a
+slower one. The comparison is performed by hashing the selected files in each
+of the source paths and counting hashes that don't match the latest archive.
+For instance, if you have 1,000 source files and your sample percentage is 1%,
+then only 10 source files will be compared against the latest archive. These
+sampled files are selected randomly each time, so in effect the spot check is
+probabilistic.
+
+The `data_tolerance_percentage` is the percentage of total files in the source
+directories that can fail a spot check data comparison without failing the
+entire consistency check. The value must be lower than or equal to the
+`contents_sample_percentage`.
+
+All three options are required when using the spot check. And because the
+check relies on these configured tolerances, it may not be a
+set-it-and-forget-it type of consistency check, at least until you get the
+tolerances dialed in so there are minimal false positives or negatives. It is
+recommended you run `borgmatic check` several times after configuring the spot
+check, tweaking your tolerances as needed. For certain workloads where your
+source files experience wild swings of file contents or counts, the spot check
+may not suitable at all.
+
+What if you add, delete, or change a bunch of your source files and you don't
+want the spot check to fail the next time it's run? Run `borgmatic create` to
+create a new backup, thereby allowing the next spot check to run against an
+archive that contains your recent changes.
+
+As long as the spot check feature is in beta, it may be subject to breaking
+changes. But feel free to use it in production if you're okay with that
+caveat, and please [provide any
+feedback](https://torsion.org/borgmatic/#issues) you have on this feature.
+
+
 ### Check frequency
 
 <span class="minilink minilink-addedin">New in version 1.6.2</span> You can

File diff suppressed because it is too large
+ 1014 - 4
tests/unit/actions/test_check.py


+ 0 - 1
tests/unit/actions/test_json.py

@@ -1,5 +1,4 @@
 import pytest
-from flexmock import flexmock
 
 from borgmatic.actions import json as module
 

File diff suppressed because it is too large
+ 102 - 637
tests/unit/borg/test_check.py


File diff suppressed because it is too large
+ 135 - 947
tests/unit/borg/test_create.py


+ 48 - 5
tests/unit/config/test_generate.py

@@ -6,9 +6,48 @@ from flexmock import flexmock
 from borgmatic.config import generate as module
 
 
+def test_get_properties_with_simple_object():
+    schema = {
+        'type': 'object',
+        'properties': OrderedDict(
+            [
+                ('field1', {'example': 'Example'}),
+            ]
+        ),
+    }
+
+    assert module.get_properties(schema) == schema['properties']
+
+
+def test_get_properties_merges_one_of_list_properties():
+    schema = {
+        'type': 'object',
+        'oneOf': [
+            {
+                'properties': OrderedDict(
+                    [
+                        ('field1', {'example': 'Example 1'}),
+                        ('field2', {'example': 'Example 2'}),
+                    ]
+                ),
+            },
+            {
+                'properties': OrderedDict(
+                    [
+                        ('field2', {'example': 'Example 2'}),
+                        ('field3', {'example': 'Example 3'}),
+                    ]
+                ),
+            },
+        ],
+    }
+
+    assert module.get_properties(schema) == dict(
+        schema['oneOf'][0]['properties'], **schema['oneOf'][1]['properties']
+    )
+
+
 def test_schema_to_sample_configuration_generates_config_map_with_examples():
-    flexmock(module.ruamel.yaml.comments).should_receive('CommentedMap').replace_with(OrderedDict)
-    flexmock(module).should_receive('add_comments_to_configuration_object')
     schema = {
         'type': 'object',
         'properties': OrderedDict(
@@ -19,6 +58,9 @@ def test_schema_to_sample_configuration_generates_config_map_with_examples():
             ]
         ),
     }
+    flexmock(module).should_receive('get_properties').and_return(schema['properties'])
+    flexmock(module.ruamel.yaml.comments).should_receive('CommentedMap').replace_with(OrderedDict)
+    flexmock(module).should_receive('add_comments_to_configuration_object')
 
     config = module.schema_to_sample_configuration(schema)
 
@@ -42,9 +84,6 @@ def test_schema_to_sample_configuration_generates_config_sequence_of_strings_wit
 
 
 def test_schema_to_sample_configuration_generates_config_sequence_of_maps_with_examples():
-    flexmock(module.ruamel.yaml.comments).should_receive('CommentedSeq').replace_with(list)
-    flexmock(module).should_receive('add_comments_to_configuration_sequence')
-    flexmock(module).should_receive('add_comments_to_configuration_object')
     schema = {
         'type': 'array',
         'items': {
@@ -54,6 +93,10 @@ def test_schema_to_sample_configuration_generates_config_sequence_of_maps_with_e
             ),
         },
     }
+    flexmock(module).should_receive('get_properties').and_return(schema['items']['properties'])
+    flexmock(module.ruamel.yaml.comments).should_receive('CommentedSeq').replace_with(list)
+    flexmock(module).should_receive('add_comments_to_configuration_sequence')
+    flexmock(module).should_receive('add_comments_to_configuration_object')
 
     config = module.schema_to_sample_configuration(schema)
 

+ 10 - 0
tests/unit/hooks/test_mariadb.py

@@ -44,6 +44,16 @@ def test_database_names_to_dump_queries_mariadb_for_database_names():
     assert names == ('foo', 'bar')
 
 
+def test_use_streaming_true_for_any_databases():
+    assert module.use_streaming(
+        databases=[flexmock(), flexmock()], config=flexmock(), log_prefix=flexmock()
+    )
+
+
+def test_use_streaming_false_for_no_databases():
+    assert not module.use_streaming(databases=[], config=flexmock(), log_prefix=flexmock())
+
+
 def test_dump_data_sources_dumps_each_database():
     databases = [{'name': 'foo'}, {'name': 'bar'}]
     processes = [flexmock(), flexmock()]

+ 20 - 0
tests/unit/hooks/test_mongodb.py

@@ -5,6 +5,26 @@ from flexmock import flexmock
 from borgmatic.hooks import mongodb as module
 
 
+def test_use_streaming_true_for_any_non_directory_format_databases():
+    assert module.use_streaming(
+        databases=[{'format': 'stuff'}, {'format': 'directory'}, {}],
+        config=flexmock(),
+        log_prefix=flexmock(),
+    )
+
+
+def test_use_streaming_false_for_all_directory_format_databases():
+    assert not module.use_streaming(
+        databases=[{'format': 'directory'}, {'format': 'directory'}],
+        config=flexmock(),
+        log_prefix=flexmock(),
+    )
+
+
+def test_use_streaming_false_for_no_databases():
+    assert not module.use_streaming(databases=[], config=flexmock(), log_prefix=flexmock())
+
+
 def test_dump_data_sources_runs_mongodump_for_each_database():
     databases = [{'name': 'foo'}, {'name': 'bar'}]
     processes = [flexmock(), flexmock()]

+ 10 - 0
tests/unit/hooks/test_mysql.py

@@ -44,6 +44,16 @@ def test_database_names_to_dump_queries_mysql_for_database_names():
     assert names == ('foo', 'bar')
 
 
+def test_use_streaming_true_for_any_databases():
+    assert module.use_streaming(
+        databases=[flexmock(), flexmock()], config=flexmock(), log_prefix=flexmock()
+    )
+
+
+def test_use_streaming_false_for_no_databases():
+    assert not module.use_streaming(databases=[], config=flexmock(), log_prefix=flexmock())
+
+
 def test_dump_data_sources_dumps_each_database():
     databases = [{'name': 'foo'}, {'name': 'bar'}]
     processes = [flexmock(), flexmock()]

+ 20 - 0
tests/unit/hooks/test_postgresql.py

@@ -199,6 +199,26 @@ def test_database_names_to_dump_with_all_and_psql_command_uses_custom_command():
     )
 
 
+def test_use_streaming_true_for_any_non_directory_format_databases():
+    assert module.use_streaming(
+        databases=[{'format': 'stuff'}, {'format': 'directory'}, {}],
+        config=flexmock(),
+        log_prefix=flexmock(),
+    )
+
+
+def test_use_streaming_false_for_all_directory_format_databases():
+    assert not module.use_streaming(
+        databases=[{'format': 'directory'}, {'format': 'directory'}],
+        config=flexmock(),
+        log_prefix=flexmock(),
+    )
+
+
+def test_use_streaming_false_for_no_databases():
+    assert not module.use_streaming(databases=[], config=flexmock(), log_prefix=flexmock())
+
+
 def test_dump_data_sources_runs_pg_dump_for_each_database():
     databases = [{'name': 'foo'}, {'name': 'bar'}]
     processes = [flexmock(), flexmock()]

+ 10 - 0
tests/unit/hooks/test_sqlite.py

@@ -5,6 +5,16 @@ from flexmock import flexmock
 from borgmatic.hooks import sqlite as module
 
 
+def test_use_streaming_true_for_any_databases():
+    assert module.use_streaming(
+        databases=[flexmock(), flexmock()], config=flexmock(), log_prefix=flexmock()
+    )
+
+
+def test_use_streaming_false_for_no_databases():
+    assert not module.use_streaming(databases=[], config=flexmock(), log_prefix=flexmock())
+
+
 def test_dump_data_sources_logs_and_skips_if_dump_already_exists():
     databases = [{'path': '/path/to/database', 'name': 'database'}]
 

+ 7 - 0
tests/unit/test_execute.py

@@ -123,6 +123,13 @@ def test_append_last_lines_with_output_log_level_none_appends_captured_output():
         (('foo', 'bar'), None, None, None, 'foo bar'),
         (('foo', 'bar'), flexmock(name='input'), None, None, 'foo bar < input'),
         (('foo', 'bar'), None, flexmock(name='output'), None, 'foo bar > output'),
+        (
+            ('A',) * module.MAX_LOGGED_COMMAND_LENGTH,
+            None,
+            None,
+            None,
+            'A ' * (module.MAX_LOGGED_COMMAND_LENGTH // 2 - 2) + '...',
+        ),
         (
             ('foo', 'bar'),
             flexmock(name='input'),

Some files were not shown because too many files changed in this diff