Ver código fonte

Spot check basically complete other than docs (#656).

Dan Helfman 1 ano atrás
pai
commit
4c2eb2bfe3

+ 5 - 0
NEWS

@@ -1,4 +1,9 @@
 1.8.10.dev0
+ * #656: Add a "spot" consistency check that compares file counts and contents between your source
+   files and the latest archive, ensuring they fall within configured tolerances. This can catch
+   problems like incorrect excludes, inadvertent deletes, files changed by malware, etc. See the
+   documentation for more information:
+   https://torsion.org/borgmatic/docs/how-to/deal-with-very-large-backups/#spot-check
  * #842: When a command hook exits with a soft failure, ping the log and finish states for any
    configured monitoring hooks.
  * #843: Add documentation link to Loki dashboard for borgmatic:

+ 286 - 8
borgmatic/actions/check.py

@@ -4,11 +4,17 @@ import itertools
 import logging
 import os
 import pathlib
+import random
 
 import borgmatic.borg.check
+import borgmatic.borg.create
+import borgmatic.borg.environment
 import borgmatic.borg.extract
+import borgmatic.borg.list
+import borgmatic.borg.rlist
 import borgmatic.borg.state
 import borgmatic.config.validate
+import borgmatic.execute
 import borgmatic.hooks.command
 
 DEFAULT_CHECKS = (
@@ -288,6 +294,276 @@ def upgrade_check_times(config, borg_repository_id):
         os.rename(temporary_path, new_path)
 
 
+def collect_spot_check_source_paths(
+    repository, config, local_borg_version, global_arguments, local_path, remote_path
+):
+    '''
+    Given a repository configuration dict, a configuration dict, the local Borg version, global
+    arguments as an argparse.Namespace instance, the local Borg path, and the remote Borg path,
+    collect the source paths that Borg would use in an actual create (but only include files and
+    symlinks).
+    '''
+    stream_processes = any(
+        borgmatic.hooks.dispatch.call_hooks(
+            'use_streaming',
+            config,
+            repository['path'],
+            borgmatic.hooks.dump.DATA_SOURCE_HOOK_NAMES,
+        ).values()
+    )
+
+    (create_flags, create_positional_arguments, pattern_file, exclude_file) = (
+        borgmatic.borg.create.make_base_create_command(
+            dry_run=True,
+            repository_path=repository['path'],
+            config=config,
+            config_paths=(),
+            local_borg_version=local_borg_version,
+            global_arguments=global_arguments,
+            borgmatic_source_directories=(),
+            local_path=local_path,
+            remote_path=remote_path,
+            list_files=True,
+            stream_processes=stream_processes,
+        )
+    )
+    borg_environment = borgmatic.borg.environment.make_environment(config)
+
+    try:
+        working_directory = os.path.expanduser(config.get('working_directory'))
+    except TypeError:
+        working_directory = None
+
+    paths_output = borgmatic.execute.execute_command_and_capture_output(
+        create_flags + create_positional_arguments,
+        capture_stderr=True,
+        working_directory=working_directory,
+        extra_environment=borg_environment,
+        borg_local_path=local_path,
+        borg_exit_codes=config.get('borg_exit_codes'),
+    )
+
+    paths = tuple(
+        path_line.split(' ', 1)[1]
+        for path_line in paths_output.split('\n')
+        if path_line and path_line.startswith('- ') or path_line.startswith('+ ')
+    )
+
+    return tuple(path for path in paths if os.path.isfile(path) or os.path.islink(path))
+
+
+BORG_DIRECTORY_FILE_TYPE = 'd'
+
+
+def collect_spot_check_archive_paths(
+    repository, archive, config, local_borg_version, global_arguments, local_path, remote_path
+):
+    '''
+    Given a repository configuration dict, the name of the latest archive, a configuration dict, the
+    local Borg version, global arguments as an argparse.Namespace instance, the local Borg path, and
+    the remote Borg path, collect the paths from the given archive (but only include files and
+    symlinks).
+    '''
+    borgmatic_source_directory = os.path.expanduser(
+        config.get(
+            'borgmatic_source_directory', borgmatic.borg.state.DEFAULT_BORGMATIC_SOURCE_DIRECTORY
+        )
+    )
+
+    return tuple(
+        path
+        for line in borgmatic.borg.list.capture_archive_listing(
+            repository['path'],
+            archive,
+            config,
+            local_borg_version,
+            global_arguments,
+            path_format='{type} /{path}{NL}',  # noqa: FS003
+            local_path=local_path,
+            remote_path=remote_path,
+        )
+        for (file_type, path) in (line.split(' ', 1),)
+        if file_type != BORG_DIRECTORY_FILE_TYPE
+        if pathlib.Path(borgmatic_source_directory) not in pathlib.Path(path).parents
+    )
+
+
+def compare_spot_check_hashes(
+    repository,
+    archive,
+    config,
+    local_borg_version,
+    global_arguments,
+    local_path,
+    remote_path,
+    log_label,
+    source_paths,
+):
+    '''
+    Given a repository configuration dict, the name of the latest archive, a configuration dict, the
+    local Borg version, global arguments as an argparse.Namespace instance, the local Borg path, the
+    remote Borg path, a log label, and spot check source paths, compare the hashes for a sampling of
+    the source paths with hashes from corresponding paths in the given archive. Return a sequence of
+    the paths that fail that hash comparison.
+    '''
+    # Based on the configured sample percentage, come up with a list of random sample files from the
+    # source directories.
+    spot_check_config = next(check for check in config['checks'] if check['name'] == 'spot')
+    sample_count = max(
+        int(len(source_paths) * (spot_check_config['data_sample_percentage'] / 100)), 1
+    )
+    source_sample_paths = tuple(random.sample(source_paths, sample_count))
+    existing_source_sample_paths = {
+        source_path for source_path in source_sample_paths if os.path.exists(source_path)
+    }
+    logger.debug(
+        f'{log_label}: Sampling {sample_count} source paths (~{spot_check_config["data_sample_percentage"]}%) for spot check'
+    )
+
+    # Hash each file in the sample paths (if it exists).
+    hash_output = borgmatic.execute.execute_command_and_capture_output(
+        (spot_check_config.get('xxh64sum_command', 'xxh64sum'),)
+        + tuple(path for path in source_sample_paths if path in existing_source_sample_paths)
+    )
+
+    source_hashes = dict(
+        (reversed(line.split('  ', 1)) for line in hash_output.splitlines()),
+        **{path: '' for path in source_sample_paths if path not in existing_source_sample_paths},
+    )
+
+    archive_hashes = dict(
+        reversed(line.split(' ', 1))
+        for line in borgmatic.borg.list.capture_archive_listing(
+            repository['path'],
+            archive,
+            config,
+            local_borg_version,
+            global_arguments,
+            list_paths=source_sample_paths,
+            path_format='{xxh64} /{path}{NL}',  # noqa: FS003
+            local_path=local_path,
+            remote_path=remote_path,
+        )
+        if line
+    )
+
+    # Compare the source hashes with the archive hashes to see how many match.
+    failing_paths = []
+
+    for path, source_hash in source_hashes.items():
+        archive_hash = archive_hashes.get(path)
+
+        if archive_hash is not None and archive_hash == source_hash:
+            continue
+
+        failing_paths.append(path)
+
+    return tuple(failing_paths)
+
+
+def spot_check(
+    repository,
+    config,
+    local_borg_version,
+    global_arguments,
+    local_path,
+    remote_path,
+):
+    '''
+    Given a repository dict, a loaded configuration dict, the local Borg version, global arguments
+    as an argparse.Namespace instance, the local Borg path, and the remote Borg path, perform a spot
+    check for the latest archive in the given repository.
+
+    A spot check compares file counts and also the hashes for a random sampling of source files on
+    disk to those stored in the latest archive. If any differences are beyond configured tolerances,
+    then the check fails.
+    '''
+    log_label = f'{repository.get("label", repository["path"])}'
+    logger.debug(f'{log_label}: Running spot check')
+    spot_check_config = next(check for check in config['checks'] if check['name'] == 'spot')
+
+    if spot_check_config['data_tolerance_percentage'] > spot_check_config['data_sample_percentage']:
+        raise ValueError(
+            'The data_tolerance_percentage must be less than or equal to the data_sample_percentage'
+        )
+
+    source_paths = collect_spot_check_source_paths(
+        repository,
+        config,
+        local_borg_version,
+        global_arguments,
+        local_path,
+        remote_path,
+    )
+    logger.debug(f'{log_label}: {len(source_paths)} total source paths for spot check')
+
+    archive = borgmatic.borg.rlist.resolve_archive_name(
+        repository['path'],
+        'latest',
+        config,
+        local_borg_version,
+        global_arguments,
+        local_path,
+        remote_path,
+    )
+    logger.debug(f'{log_label}: Using archive {archive} for spot check')
+
+    archive_paths = collect_spot_check_archive_paths(
+        repository,
+        archive,
+        config,
+        local_borg_version,
+        global_arguments,
+        local_path,
+        remote_path,
+    )
+    logger.debug(f'{log_label}: {len(archive_paths)} total archive paths for spot check')
+
+    # Calculate the percentage delta between the source paths count and the archive paths count, and
+    # compare that delta to the configured count tolerance percentage.
+    count_delta_percentage = abs(len(source_paths) - len(archive_paths)) / len(source_paths) * 100
+
+    if count_delta_percentage > spot_check_config['count_tolerance_percentage']:
+        logger.debug(
+            f'{log_label}: Paths in source paths but not latest archive: {", ".join(set(source_paths) - set(archive_paths)) or "none"}'
+        )
+        logger.debug(
+            f'{log_label}: Paths in latest archive but not source paths: {", ".join(set(archive_paths) - set(source_paths)) or "none"}'
+        )
+        raise ValueError(
+            f'Spot check failed: {count_delta_percentage:.2f}% file count delta between source paths and latest archive (tolerance is {spot_check_config["count_tolerance_percentage"]}%)'
+        )
+
+    failing_paths = compare_spot_check_hashes(
+        repository,
+        archive,
+        config,
+        local_borg_version,
+        global_arguments,
+        local_path,
+        remote_path,
+        log_label,
+        source_paths,
+    )
+
+    # Error if the percentage of failing hashes exceeds the configured tolerance percentage.
+    logger.debug(f'{log_label}: {len(failing_paths)} non-matching spot check hashes')
+    data_tolerance_percentage = spot_check_config['data_tolerance_percentage']
+    failing_percentage = (len(failing_paths) / len(source_paths)) * 100
+
+    if failing_percentage > data_tolerance_percentage:
+        logger.debug(
+            f'{log_label}: Source paths with data not matching the latest archive: {", ".join(failing_paths)}'
+        )
+        raise ValueError(
+            f'Spot check failed: {failing_percentage:.2f}% of source paths with data not matching the latest archive (tolerance is {data_tolerance_percentage}%)'
+        )
+
+    logger.info(
+        f'{log_label}: Spot check passed with a {count_delta_percentage:.2f}% file count delta and a {failing_percentage:.2f}% file data delta'
+    )
+
+
 def run_check(
     config_filename,
     repository,
@@ -369,14 +645,16 @@ def run_check(
         )
         write_check_time(make_check_time_path(config, repository_id, 'extract'))
 
-    # if 'spot' in checks:
-    # TODO:
-    # count the number of files in source directories, but need to take patterns and stuff into account...
-    # in a loop until the sample percentage (of the total source files) is met:
-    # pick a random file from source directories and calculate its sha256 sum
-    # extract the file from the latest archive (to stdout) and calculate its sha256 sum
-    # if the two checksums are equal, increment the matching files count
-    # if the percentage of matching files (of the total source files) < tolerance percentage, error
+    if 'spot' in checks:
+        spot_check(
+            repository,
+            config,
+            local_borg_version,
+            global_arguments,
+            local_path,
+            remote_path,
+        )
+        write_check_time(make_check_time_path(config, repository_id, 'spot'))
 
     borgmatic.hooks.command.execute_hook(
         config.get('after_check'),

+ 1 - 2
borgmatic/actions/json.py

@@ -1,6 +1,5 @@
-import logging
 import json
-
+import logging
 
 logger = logging.getLogger(__name__)
 

+ 40 - 35
borgmatic/borg/create.py

@@ -275,11 +275,11 @@ def collect_special_file_paths(
     create_command, config, local_path, working_directory, borg_environment, skip_directories
 ):
     '''
-    Given a Borg create command as a tuple, a local Borg path, a working directory, a dict of
-    environment variables to pass to Borg, and a sequence of parent directories to skip, collect the
-    paths for any special files (character devices, block devices, and named pipes / FIFOs) that
-    Borg would encounter during a create. These are all paths that could cause Borg to hang if its
-    --read-special flag is used.
+    Given a Borg create command as a tuple, a configuration dict, a local Borg path, a working
+    directory, a dict of environment variables to pass to Borg, and a sequence of parent directories
+    to skip, collect the paths for any special files (character devices, block devices, and named
+    pipes / FIFOs) that Borg would encounter during a create. These are all paths that could cause
+    Borg to hang if its --read-special flag is used.
     '''
     # Omit "--exclude-nodump" from the Borg dry run command, because that flag causes Borg to open
     # files including any named pipe we've created.
@@ -402,11 +402,6 @@ def make_base_create_command(
             ('--remote-ratelimit', str(upload_rate_limit)) if upload_rate_limit else ()
         )
 
-    if stream_processes and config.get('read_special') is False:
-        logger.warning(
-            f'{repository_path}: Ignoring configured "read_special" value of false, as true is needed for database hooks.'
-        )
-
     create_flags = (
         tuple(local_path.split(' '))
         + ('create',)
@@ -442,6 +437,41 @@ def make_base_create_command(
         repository_path, archive_name_format, local_borg_version
     ) + (sources if not pattern_file else ())
 
+    # If database hooks are enabled (as indicated by streaming processes), exclude files that might
+    # cause Borg to hang. But skip this if the user has explicitly set the "read_special" to True.
+    if stream_processes and not config.get('read_special'):
+        logger.warning(
+            f'{repository_path}: Ignoring configured "read_special" value of false, as true is needed for database hooks.'
+        )
+        try:
+            working_directory = os.path.expanduser(config.get('working_directory'))
+        except TypeError:
+            working_directory = None
+
+        borg_environment = environment.make_environment(config)
+
+        logger.debug(f'{repository_path}: Collecting special file paths')
+        special_file_paths = collect_special_file_paths(
+            create_flags + create_positional_arguments,
+            config,
+            local_path,
+            working_directory,
+            borg_environment,
+            skip_directories=borgmatic_source_directories,
+        )
+
+        if special_file_paths:
+            logger.warning(
+                f'{repository_path}: Excluding special files to prevent Borg from hanging: {", ".join(special_file_paths)}'
+            )
+            exclude_file = write_pattern_file(
+                expand_home_directories(
+                    tuple(config.get('exclude_patterns') or ()) + special_file_paths
+                ),
+                pattern_file=exclude_file,
+            )
+            create_flags += make_exclude_flags(config, exclude_file.name)
+
     return (create_flags, create_positional_arguments, pattern_file, exclude_file)
 
 
@@ -509,31 +539,6 @@ def create_archive(
 
     borg_environment = environment.make_environment(config)
 
-    # If database hooks are enabled (as indicated by streaming processes), exclude files that might
-    # cause Borg to hang. But skip this if the user has explicitly set the "read_special" to True.
-    if stream_processes and not config.get('read_special'):
-        logger.debug(f'{repository_path}: Collecting special file paths')
-        special_file_paths = collect_special_file_paths(
-            create_flags + create_positional_arguments,
-            config,
-            local_path,
-            working_directory,
-            borg_environment,
-            skip_directories=borgmatic_source_directories,
-        )
-
-        if special_file_paths:
-            logger.warning(
-                f'{repository_path}: Excluding special files to prevent Borg from hanging: {", ".join(special_file_paths)}'
-            )
-            exclude_file = write_pattern_file(
-                expand_home_directories(
-                    tuple(config.get('exclude_patterns') or ()) + special_file_paths
-                ),
-                pattern_file=exclude_file,
-            )
-            create_flags += make_exclude_flags(config, exclude_file.name)
-
     create_flags += (
         (('--info',) if logger.getEffectiveLevel() == logging.INFO and not json else ())
         + (('--stats',) if stats and not json and not dry_run else ())

+ 5 - 4
borgmatic/borg/list.py

@@ -95,14 +95,15 @@ def capture_archive_listing(
     local_borg_version,
     global_arguments,
     list_paths=None,
+    path_format=None,
     local_path='borg',
     remote_path=None,
 ):
     '''
     Given a local or remote repository path, an archive name, a configuration dict, the local Borg
-    version, global arguments as an argparse.Namespace, the archive paths in which to list files, and
-    local and remote Borg paths, capture the output of listing that archive and return it as a list
-    of file paths.
+    version, global arguments as an argparse.Namespace, the archive paths in which to list files,
+    the Borg path format to use for the output, and local and remote Borg paths, capture the output
+    of listing that archive and return it as a list of file paths.
     '''
     borg_environment = environment.make_environment(config)
 
@@ -118,7 +119,7 @@ def capture_archive_listing(
                     paths=[f'sh:{path}' for path in list_paths] if list_paths else None,
                     find_paths=None,
                     json=None,
-                    format='{path}{NL}',  # noqa: FS003
+                    format=path_format or '{path}{NL}',  # noqa: FS003
                 ),
                 global_arguments,
                 local_path,

+ 2 - 2
borgmatic/commands/arguments.py

@@ -614,10 +614,10 @@ def make_parsers():
     check_group.add_argument(
         '--only',
         metavar='CHECK',
-        choices=('repository', 'archives', 'data', 'extract'),
+        choices=('repository', 'archives', 'data', 'extract', 'spot'),
         dest='only_checks',
         action='append',
-        help='Run a particular consistency check (repository, archives, data, or extract) instead of configured checks (subject to configured frequency, can specify flag multiple times)',
+        help='Run a particular consistency check (repository, archives, data, extract, or spot) instead of configured checks (subject to configured frequency, can specify flag multiple times)',
     )
     check_group.add_argument(
         '--force',

+ 33 - 12
borgmatic/config/schema.yaml

@@ -543,8 +543,9 @@ properties:
                           example: 2 weeks
                 - required:
                     - name
-                    - sample_percentage
-                    - tolerance_percentage
+                    - data_sample_percentage
+                    - data_tolerance_percentage
+                    - count_tolerance_percentage
                   additionalProperties: false
                   properties:
                       name:
@@ -577,25 +578,45 @@ properties:
                               "always": running this check every time checks
                               are run.
                           example: 2 weeks
-                      sample_percentage:
+                      count_tolerance_percentage:
+                          type: number
+                          description: |
+                              The percentage delta between the source
+                              directories file count and the most recent backup
+                              archive file count that is allowed before the
+                              entire consistency check fails. This can catch
+                              problems like incorrect excludes, inadvertent
+                              deletes, etc. Only applies to the "spot" check.
+                          example: 10
+                      data_sample_percentage:
                           type: number
                           description: |
                               The percentage of total files in the source
                               directories to randomly sample and compare to
                               their corresponding files in the most recent
-                              backup archive. Only applies to the "spot"
-                              check.
-                          example: 5
-                      tolerance_percentage:
+                              backup archive. Only applies to the "spot" check.
+                          example: 1
+                      data_tolerance_percentage:
                           type: number
                           description: |
                               The percentage of total files in the source
-                              directories that can fail a spot check
-                              comparison without failing the entire
-                              consistency check. Should be lower than or
-                              equal to the "sample_percentage". Only applies
-                              to the "spot" check.
+                              directories that can fail a spot check comparison
+                              without failing the entire consistency check. This
+                              can catch problems like source files that have
+                              been bulk-changed by malware, backups that have
+                              been tampered with, etc. The value must be lower
+                              than or equal to the "contents_sample_percentage".
+                              Only applies to the "spot" check.
                           example: 0.5
+                      xxh64sum_command:
+                          type: string
+                          description: |
+                              Command to use instead of "xxh64sum" to hash
+                              source files, usually found in an OS package named
+                              "xxhash". Do not substitute with a different hash
+                              type (SHA, MD5, etc.) or the check will never
+                              succeed. Only applies to the "spot" check.
+                          example: /usr/local/bin/xxh64sum
         description: |
             List of one or more consistency checks to run on a periodic basis
             (if "frequency" is set) or every time borgmatic runs checks (if

+ 11 - 1
borgmatic/execute.py

@@ -4,6 +4,7 @@ import logging
 import os
 import select
 import subprocess
+import textwrap
 
 logger = logging.getLogger(__name__)
 
@@ -219,13 +220,22 @@ def log_outputs(processes, exclude_stdouts, output_log_level, borg_local_path, b
         }
 
 
+MAX_LOGGED_COMMAND_LENGTH = 1000
+
+
 def log_command(full_command, input_file=None, output_file=None, environment=None):
     '''
     Log the given command (a sequence of command/argument strings), along with its input/output file
     paths and extra environment variables (with omitted values in case they contain passwords).
     '''
     logger.debug(
-        ' '.join(tuple(f'{key}=***' for key in (environment or {}).keys()) + tuple(full_command))
+        textwrap.shorten(
+            ' '.join(
+                tuple(f'{key}=***' for key in (environment or {}).keys()) + tuple(full_command)
+            ),
+            width=MAX_LOGGED_COMMAND_LENGTH,
+            placeholder=' ...',
+        )
         + (f" < {getattr(input_file, 'name', '')}" if input_file else '')
         + (f" > {getattr(output_file, 'name', '')}" if output_file else '')
     )

+ 8 - 0
borgmatic/hooks/mariadb.py

@@ -115,6 +115,14 @@ def execute_dump_command(
     )
 
 
+def use_streaming(databases, config, log_prefix):
+    '''
+    Given a sequence of MariaDB database configuration dicts, a configuration dict (ignored), and a
+    log prefix (ignored), return whether streaming will be using during dumps.
+    '''
+    return any(databases)
+
+
 def dump_data_sources(databases, config, log_prefix, dry_run):
     '''
     Dump the given MariaDB databases to a named pipe. The databases are supplied as a sequence of

+ 8 - 0
borgmatic/hooks/mongodb.py

@@ -16,6 +16,14 @@ def make_dump_path(config):  # pragma: no cover
     )
 
 
+def use_streaming(databases, config, log_prefix):
+    '''
+    Given a sequence of MongoDB database configuration dicts, a configuration dict (ignored), and a
+    log prefix (ignored), return whether streaming will be using during dumps.
+    '''
+    return any(database.get('format') != 'directory' for database in databases)
+
+
 def dump_data_sources(databases, config, log_prefix, dry_run):
     '''
     Dump the given MongoDB databases to a named pipe. The databases are supplied as a sequence of

+ 8 - 0
borgmatic/hooks/mysql.py

@@ -114,6 +114,14 @@ def execute_dump_command(
     )
 
 
+def use_streaming(databases, config, log_prefix):
+    '''
+    Given a sequence of MySQL database configuration dicts, a configuration dict (ignored), and a
+    log prefix (ignored), return whether streaming will be using during dumps.
+    '''
+    return any(databases)
+
+
 def dump_data_sources(databases, config, log_prefix, dry_run):
     '''
     Dump the given MySQL/MariaDB databases to a named pipe. The databases are supplied as a sequence

+ 8 - 0
borgmatic/hooks/postgresql.py

@@ -96,6 +96,14 @@ def database_names_to_dump(database, extra_environment, log_prefix, dry_run):
     )
 
 
+def use_streaming(databases, config, log_prefix):
+    '''
+    Given a sequence of PostgreSQL database configuration dicts, a configuration dict (ignored), and
+    a log prefix (ignored), return whether streaming will be using during dumps.
+    '''
+    return any(database.get('format') != 'directory' for database in databases)
+
+
 def dump_data_sources(databases, config, log_prefix, dry_run):
     '''
     Dump the given PostgreSQL databases to a named pipe. The databases are supplied as a sequence of

+ 12 - 4
borgmatic/hooks/sqlite.py

@@ -17,9 +17,17 @@ def make_dump_path(config):  # pragma: no cover
     )
 
 
+def use_streaming(databases, config, log_prefix):
+    '''
+    Given a sequence of SQLite database configuration dicts, a configuration dict (ignored), and a
+    log prefix (ignored), return whether streaming will be using during dumps.
+    '''
+    return any(databases)
+
+
 def dump_data_sources(databases, config, log_prefix, dry_run):
     '''
-    Dump the given SQLite3 databases to a named pipe. The databases are supplied as a sequence of
+    Dump the given SQLite databases to a named pipe. The databases are supplied as a sequence of
     configuration dicts, as per the configuration schema. Use the given configuration dict to
     construct the destination path and the given log prefix in any log entries.
 
@@ -71,7 +79,7 @@ def dump_data_sources(databases, config, log_prefix, dry_run):
 
 def remove_data_source_dumps(databases, config, log_prefix, dry_run):  # pragma: no cover
     '''
-    Remove the given SQLite3 database dumps from the filesystem. The databases are supplied as a
+    Remove the given SQLite database dumps from the filesystem. The databases are supplied as a
     sequence of configuration dicts, as per the configuration schema. Use the given configuration
     dict to construct the destination path and the given log prefix in any log entries. If this is a
     dry run, then don't actually remove anything.
@@ -81,8 +89,8 @@ def remove_data_source_dumps(databases, config, log_prefix, dry_run):  # pragma:
 
 def make_data_source_dump_pattern(databases, config, log_prefix, name=None):  # pragma: no cover
     '''
-    Make a pattern that matches the given SQLite3 databases. The databases are supplied as a
-    sequence of configuration dicts, as per the configuration schema.
+    Make a pattern that matches the given SQLite databases. The databases are supplied as a sequence
+    of configuration dicts, as per the configuration schema.
     '''
     return dump.make_data_source_dump_filename(make_dump_path(config), name)
 

+ 474 - 0
tests/unit/actions/test_check.py

@@ -409,6 +409,444 @@ def test_upgrade_check_times_renames_stale_temporary_check_path():
     module.upgrade_check_times(flexmock(), flexmock())
 
 
+def test_collect_spot_check_source_paths_parses_borg_output():
+    flexmock(module.borgmatic.hooks.dispatch).should_receive('call_hooks').and_return(
+        {'hook1': False, 'hook2': True}
+    )
+    flexmock(module.borgmatic.borg.create).should_receive('make_base_create_command').with_args(
+        dry_run=True,
+        repository_path='repo',
+        config=object,
+        config_paths=(),
+        local_borg_version=object,
+        global_arguments=object,
+        borgmatic_source_directories=(),
+        local_path=object,
+        remote_path=object,
+        list_files=True,
+        stream_processes=True,
+    ).and_return((('borg', 'create'), ('repo::archive',), flexmock(), flexmock()))
+    flexmock(module.borgmatic.borg.environment).should_receive('make_environment').and_return(
+        flexmock()
+    )
+    flexmock(module.borgmatic.execute).should_receive(
+        'execute_command_and_capture_output'
+    ).and_return(
+        'warning: stuff\n- /etc/path\n+ /etc/other\n? /nope',
+    )
+    flexmock(module.os.path).should_receive('isfile').and_return(True)
+
+    assert module.collect_spot_check_source_paths(
+        repository={'path': 'repo'},
+        config={'working_directory': '/'},
+        local_borg_version=flexmock(),
+        global_arguments=flexmock(),
+        local_path=flexmock(),
+        remote_path=flexmock(),
+    ) == ('/etc/path', '/etc/other')
+
+
+def test_collect_spot_check_source_paths_passes_through_stream_processes_false():
+    flexmock(module.borgmatic.hooks.dispatch).should_receive('call_hooks').and_return(
+        {'hook1': False, 'hook2': False}
+    )
+    flexmock(module.borgmatic.borg.create).should_receive('make_base_create_command').with_args(
+        dry_run=True,
+        repository_path='repo',
+        config=object,
+        config_paths=(),
+        local_borg_version=object,
+        global_arguments=object,
+        borgmatic_source_directories=(),
+        local_path=object,
+        remote_path=object,
+        list_files=True,
+        stream_processes=False,
+    ).and_return((('borg', 'create'), ('repo::archive',), flexmock(), flexmock()))
+    flexmock(module.borgmatic.borg.environment).should_receive('make_environment').and_return(
+        flexmock()
+    )
+    flexmock(module.borgmatic.execute).should_receive(
+        'execute_command_and_capture_output'
+    ).and_return(
+        'warning: stuff\n- /etc/path\n+ /etc/other\n? /nope',
+    )
+    flexmock(module.os.path).should_receive('isfile').and_return(True)
+
+    assert module.collect_spot_check_source_paths(
+        repository={'path': 'repo'},
+        config={'working_directory': '/'},
+        local_borg_version=flexmock(),
+        global_arguments=flexmock(),
+        local_path=flexmock(),
+        remote_path=flexmock(),
+    ) == ('/etc/path', '/etc/other')
+
+
+def test_collect_spot_check_source_paths_without_working_directory_parses_borg_output():
+    flexmock(module.borgmatic.hooks.dispatch).should_receive('call_hooks').and_return(
+        {'hook1': False, 'hook2': True}
+    )
+    flexmock(module.borgmatic.borg.create).should_receive('make_base_create_command').with_args(
+        dry_run=True,
+        repository_path='repo',
+        config=object,
+        config_paths=(),
+        local_borg_version=object,
+        global_arguments=object,
+        borgmatic_source_directories=(),
+        local_path=object,
+        remote_path=object,
+        list_files=True,
+        stream_processes=True,
+    ).and_return((('borg', 'create'), ('repo::archive',), flexmock(), flexmock()))
+    flexmock(module.borgmatic.borg.environment).should_receive('make_environment').and_return(
+        flexmock()
+    )
+    flexmock(module.borgmatic.execute).should_receive(
+        'execute_command_and_capture_output'
+    ).and_return(
+        'warning: stuff\n- /etc/path\n+ /etc/other\n? /nope',
+    )
+    flexmock(module.os.path).should_receive('isfile').and_return(True)
+
+    assert module.collect_spot_check_source_paths(
+        repository={'path': 'repo'},
+        config={},
+        local_borg_version=flexmock(),
+        global_arguments=flexmock(),
+        local_path=flexmock(),
+        remote_path=flexmock(),
+    ) == ('/etc/path', '/etc/other')
+
+
+def test_collect_spot_check_source_paths_includes_symlinks_but_skips_directories():
+    flexmock(module.borgmatic.hooks.dispatch).should_receive('call_hooks').and_return(
+        {'hook1': False, 'hook2': True}
+    )
+    flexmock(module.borgmatic.borg.create).should_receive('make_base_create_command').with_args(
+        dry_run=True,
+        repository_path='repo',
+        config=object,
+        config_paths=(),
+        local_borg_version=object,
+        global_arguments=object,
+        borgmatic_source_directories=(),
+        local_path=object,
+        remote_path=object,
+        list_files=True,
+        stream_processes=True,
+    ).and_return((('borg', 'create'), ('repo::archive',), flexmock(), flexmock()))
+    flexmock(module.borgmatic.borg.environment).should_receive('make_environment').and_return(
+        flexmock()
+    )
+    flexmock(module.borgmatic.execute).should_receive(
+        'execute_command_and_capture_output'
+    ).and_return(
+        'warning: stuff\n- /etc/path\n+ /etc/dir\n? /nope',
+    )
+    flexmock(module.os.path).should_receive('isfile').with_args('/etc/path').and_return(False)
+    flexmock(module.os.path).should_receive('islink').with_args('/etc/path').and_return(True)
+    flexmock(module.os.path).should_receive('isfile').with_args('/etc/dir').and_return(False)
+    flexmock(module.os.path).should_receive('islink').with_args('/etc/dir').and_return(False)
+
+    assert module.collect_spot_check_source_paths(
+        repository={'path': 'repo'},
+        config={'working_directory': '/'},
+        local_borg_version=flexmock(),
+        global_arguments=flexmock(),
+        local_path=flexmock(),
+        remote_path=flexmock(),
+    ) == ('/etc/path',)
+
+
+def test_collect_spot_check_archive_paths_excludes_directories():
+    flexmock(module.borgmatic.borg.list).should_receive('capture_archive_listing').and_return(
+        (
+            'f /etc/path',
+            'f /etc/other',
+            'd /etc/dir',
+        )
+    )
+
+    assert module.collect_spot_check_archive_paths(
+        repository={'path': 'repo'},
+        archive='archive',
+        config={},
+        local_borg_version=flexmock(),
+        global_arguments=flexmock(),
+        local_path=flexmock(),
+        remote_path=flexmock(),
+    ) == ('/etc/path', '/etc/other')
+
+
+def test_collect_spot_check_archive_paths_excludes_file_in_borgmatic_source_directory():
+    flexmock(module.borgmatic.borg.list).should_receive('capture_archive_listing').and_return(
+        (
+            'f /etc/path',
+            'f /root/.borgmatic/some/thing',
+        )
+    )
+
+    assert module.collect_spot_check_archive_paths(
+        repository={'path': 'repo'},
+        archive='archive',
+        config={'borgmatic_source_directory': '/root/.borgmatic'},
+        local_borg_version=flexmock(),
+        global_arguments=flexmock(),
+        local_path=flexmock(),
+        remote_path=flexmock(),
+    ) == ('/etc/path',)
+
+
+def test_compare_spot_check_hashes_returns_paths_having_failing_hashes():
+    flexmock(module.random).should_receive('sample').replace_with(
+        lambda population, count: population[:count]
+    )
+    flexmock(module.os.path).should_receive('exists').and_return(True)
+    flexmock(module.borgmatic.execute).should_receive(
+        'execute_command_and_capture_output'
+    ).with_args(('xxh64sum', '/foo', '/bar')).and_return('hash1  /foo\nhash2  /bar')
+    flexmock(module.borgmatic.borg.list).should_receive('capture_archive_listing').and_return(
+        ['hash1 /foo', 'nothash2 /bar']
+    )
+
+    assert module.compare_spot_check_hashes(
+        repository={'path': 'repo'},
+        archive='archive',
+        config={
+            'checks': [
+                {
+                    'name': 'archives',
+                    'frequency': '2 weeks',
+                },
+                {
+                    'name': 'spot',
+                    'data_sample_percentage': 50,
+                },
+            ]
+        },
+        local_borg_version=flexmock(),
+        global_arguments=flexmock(),
+        local_path=flexmock(),
+        remote_path=flexmock(),
+        log_label='repo',
+        source_paths=('/foo', '/bar', '/baz', '/quux'),
+    ) == ('/bar',)
+
+
+def test_compare_spot_check_hashes_uses_xxh64sum_command_option():
+    flexmock(module.random).should_receive('sample').replace_with(
+        lambda population, count: population[:count]
+    )
+    flexmock(module.os.path).should_receive('exists').and_return(True)
+    flexmock(module.borgmatic.execute).should_receive(
+        'execute_command_and_capture_output'
+    ).with_args(('/usr/local/bin/xxh64sum', '/foo', '/bar')).and_return('hash1  /foo\nhash2  /bar')
+    flexmock(module.borgmatic.borg.list).should_receive('capture_archive_listing').and_return(
+        ['hash1 /foo', 'nothash2 /bar']
+    )
+
+    assert module.compare_spot_check_hashes(
+        repository={'path': 'repo'},
+        archive='archive',
+        config={
+            'checks': [
+                {
+                    'name': 'spot',
+                    'data_sample_percentage': 50,
+                    'xxh64sum_command': '/usr/local/bin/xxh64sum',
+                },
+            ]
+        },
+        local_borg_version=flexmock(),
+        global_arguments=flexmock(),
+        local_path=flexmock(),
+        remote_path=flexmock(),
+        log_label='repo',
+        source_paths=('/foo', '/bar', '/baz', '/quux'),
+    ) == ('/bar',)
+
+
+def test_compare_spot_check_hashes_consider_path_missing_from_archive_as_not_matching():
+    flexmock(module.random).should_receive('sample').replace_with(
+        lambda population, count: population[:count]
+    )
+    flexmock(module.os.path).should_receive('exists').and_return(True)
+    flexmock(module.borgmatic.execute).should_receive(
+        'execute_command_and_capture_output'
+    ).with_args(('xxh64sum', '/foo', '/bar')).and_return('hash1  /foo\nhash2  /bar')
+    flexmock(module.borgmatic.borg.list).should_receive('capture_archive_listing').and_return(
+        ['hash1 /foo']
+    )
+
+    assert module.compare_spot_check_hashes(
+        repository={'path': 'repo'},
+        archive='archive',
+        config={
+            'checks': [
+                {
+                    'name': 'spot',
+                    'data_sample_percentage': 50,
+                },
+            ]
+        },
+        local_borg_version=flexmock(),
+        global_arguments=flexmock(),
+        local_path=flexmock(),
+        remote_path=flexmock(),
+        log_label='repo',
+        source_paths=('/foo', '/bar', '/baz', '/quux'),
+    ) == ('/bar',)
+
+
+def test_compare_spot_check_hashes_considers_non_existent_path_as_not_matching():
+    flexmock(module.random).should_receive('sample').replace_with(
+        lambda population, count: population[:count]
+    )
+    flexmock(module.os.path).should_receive('exists').with_args('/foo').and_return(True)
+    flexmock(module.os.path).should_receive('exists').with_args('/bar').and_return(False)
+    flexmock(module.borgmatic.execute).should_receive(
+        'execute_command_and_capture_output'
+    ).with_args(('xxh64sum', '/foo')).and_return('hash1  /foo')
+    flexmock(module.borgmatic.borg.list).should_receive('capture_archive_listing').and_return(
+        ['hash1 /foo', 'hash2 /bar']
+    )
+
+    assert module.compare_spot_check_hashes(
+        repository={'path': 'repo'},
+        archive='archive',
+        config={
+            'checks': [
+                {
+                    'name': 'spot',
+                    'data_sample_percentage': 50,
+                },
+            ]
+        },
+        local_borg_version=flexmock(),
+        global_arguments=flexmock(),
+        local_path=flexmock(),
+        remote_path=flexmock(),
+        log_label='repo',
+        source_paths=('/foo', '/bar', '/baz', '/quux'),
+    ) == ('/bar',)
+
+
+def test_spot_check_data_tolerance_percenatge_greater_than_data_sample_percentage_errors():
+    with pytest.raises(ValueError):
+        module.spot_check(
+            repository={'path': 'repo'},
+            config={
+                'checks': [
+                    {
+                        'name': 'spot',
+                        'data_tolerance_percentage': 7,
+                        'data_sample_percentage': 5,
+                    },
+                ]
+            },
+            local_borg_version=flexmock(),
+            global_arguments=flexmock(),
+            local_path=flexmock(),
+            remote_path=flexmock(),
+        )
+
+
+def test_spot_check_with_count_delta_greater_than_count_tolerance_percentage_errors():
+    flexmock(module).should_receive('collect_spot_check_source_paths').and_return(
+        ('/foo', '/bar', '/baz', '/quux')
+    )
+    flexmock(module.borgmatic.borg.rlist).should_receive('resolve_archive_name').and_return(
+        'archive'
+    )
+    flexmock(module).should_receive('collect_spot_check_archive_paths').and_return(
+        ('/foo', '/bar')
+    ).once()
+
+    with pytest.raises(ValueError):
+        module.spot_check(
+            repository={'path': 'repo'},
+            config={
+                'checks': [
+                    {
+                        'name': 'spot',
+                        'count_tolerance_percentage': 1,
+                        'data_tolerance_percentage': 4,
+                        'data_sample_percentage': 5,
+                    },
+                ]
+            },
+            local_borg_version=flexmock(),
+            global_arguments=flexmock(),
+            local_path=flexmock(),
+            remote_path=flexmock(),
+        )
+
+
+def test_spot_check_with_failing_percentage_greater_than_data_tolerance_percentage_errors():
+    flexmock(module).should_receive('collect_spot_check_source_paths').and_return(
+        ('/foo', '/bar', '/baz', '/quux')
+    )
+    flexmock(module.borgmatic.borg.rlist).should_receive('resolve_archive_name').and_return(
+        'archive'
+    )
+    flexmock(module).should_receive('collect_spot_check_archive_paths').and_return(('/foo', '/bar'))
+    flexmock(module).should_receive('compare_spot_check_hashes').and_return(
+        ('/bar', '/baz', '/quux')
+    ).once()
+
+    with pytest.raises(ValueError):
+        module.spot_check(
+            repository={'path': 'repo'},
+            config={
+                'checks': [
+                    {
+                        'name': 'spot',
+                        'count_tolerance_percentage': 55,
+                        'data_tolerance_percentage': 4,
+                        'data_sample_percentage': 5,
+                    },
+                ]
+            },
+            local_borg_version=flexmock(),
+            global_arguments=flexmock(),
+            local_path=flexmock(),
+            remote_path=flexmock(),
+        )
+
+
+def test_spot_check_with_high_enough_tolerances_does_not_raise():
+    flexmock(module).should_receive('collect_spot_check_source_paths').and_return(
+        ('/foo', '/bar', '/baz', '/quux')
+    )
+    flexmock(module.borgmatic.borg.rlist).should_receive('resolve_archive_name').and_return(
+        'archive'
+    )
+    flexmock(module).should_receive('collect_spot_check_archive_paths').and_return(('/foo', '/bar'))
+    flexmock(module).should_receive('compare_spot_check_hashes').and_return(
+        ('/bar', '/baz', '/quux')
+    ).once()
+
+    module.spot_check(
+        repository={'path': 'repo'},
+        config={
+            'checks': [
+                {
+                    'name': 'spot',
+                    'count_tolerance_percentage': 55,
+                    'data_tolerance_percentage': 80,
+                    'data_sample_percentage': 80,
+                },
+            ]
+        },
+        local_borg_version=flexmock(),
+        global_arguments=flexmock(),
+        local_path=flexmock(),
+        remote_path=flexmock(),
+    )
+
+
 def test_run_check_checks_archives_for_configured_repository():
     flexmock(module.logger).answer = lambda message: None
     flexmock(module.borgmatic.config.validate).should_receive('repositories_match').never()
@@ -483,6 +921,42 @@ def test_run_check_runs_configured_extract_check():
     )
 
 
+def test_run_check_runs_configured_spot_check():
+    flexmock(module.logger).answer = lambda message: None
+    flexmock(module.borgmatic.config.validate).should_receive('repositories_match').never()
+    flexmock(module.borgmatic.borg.check).should_receive('get_repository_id').and_return(flexmock())
+    flexmock(module).should_receive('upgrade_check_times')
+    flexmock(module).should_receive('parse_checks')
+    flexmock(module.borgmatic.borg.check).should_receive('make_archive_filter_flags').and_return(())
+    flexmock(module).should_receive('make_archives_check_id').and_return(None)
+    flexmock(module).should_receive('filter_checks_on_frequency').and_return({'spot'})
+    flexmock(module.borgmatic.borg.check).should_receive('check_archives').never()
+    flexmock(module.borgmatic.actions.check).should_receive('spot_check').once()
+    flexmock(module).should_receive('make_check_time_path')
+    flexmock(module).should_receive('write_check_time')
+    flexmock(module.borgmatic.hooks.command).should_receive('execute_hook').times(2)
+    check_arguments = flexmock(
+        repository=None,
+        progress=flexmock(),
+        repair=flexmock(),
+        only_checks=flexmock(),
+        force=flexmock(),
+    )
+    global_arguments = flexmock(monitoring_verbosity=1, dry_run=False)
+
+    module.run_check(
+        config_filename='test.yaml',
+        repository={'path': 'repo'},
+        config={'repositories': ['repo']},
+        hook_context={},
+        local_borg_version=None,
+        check_arguments=check_arguments,
+        global_arguments=global_arguments,
+        local_path=None,
+        remote_path=None,
+    )
+
+
 def test_run_check_without_checks_runs_nothing_except_hooks():
     flexmock(module.logger).answer = lambda message: None
     flexmock(module.borgmatic.config.validate).should_receive('repositories_match').never()

+ 0 - 1
tests/unit/actions/test_json.py

@@ -1,5 +1,4 @@
 import pytest
-from flexmock import flexmock
 
 from borgmatic.actions import json as module
 

+ 49 - 124
tests/unit/borg/test_create.py

@@ -948,7 +948,7 @@ def test_make_base_create_command_includes_list_flags_in_borg_command():
     assert not exclude_file
 
 
-def test_make_base_create_command_with_stream_processes_ignores_read_special_false_and_logs_warning():
+def test_make_base_create_command_with_stream_processes_ignores_read_special_false_and_excludes_special_files():
     flexmock(module).should_receive('deduplicate_directories').and_return(('foo', 'bar'))
     flexmock(module).should_receive('map_directories_to_devices').and_return({})
     flexmock(module).should_receive('expand_directories').and_return(())
@@ -959,12 +959,17 @@ def test_make_base_create_command_with_stream_processes_ignores_read_special_fal
     flexmock(module).should_receive('make_list_filter_flags').and_return('FOO')
     flexmock(module.feature).should_receive('available').and_return(True)
     flexmock(module).should_receive('ensure_files_readable')
-    flexmock(module.logger).should_receive('warning').once()
     flexmock(module).should_receive('make_pattern_flags').and_return(())
     flexmock(module).should_receive('make_exclude_flags').and_return(())
     flexmock(module.flags).should_receive('make_repository_archive_flags').and_return(
         (f'repo::{DEFAULT_ARCHIVE_NAME}',)
     )
+    flexmock(module.logger).should_receive('warning').twice()
+    flexmock(module.environment).should_receive('make_environment')
+    flexmock(module).should_receive('collect_special_file_paths').and_return(('/dev/null',)).once()
+    flexmock(module).should_receive('expand_home_directories').and_return(())
+    flexmock(module).should_receive('write_pattern_file').and_return(flexmock(name='patterns'))
+    flexmock(module).should_receive('make_exclude_flags').and_return(())
 
     (create_flags, create_positional_arguments, pattern_file, exclude_file) = (
         module.make_base_create_command(
@@ -983,6 +988,48 @@ def test_make_base_create_command_with_stream_processes_ignores_read_special_fal
         )
     )
 
+    assert create_flags == ('borg', 'create', '--one-file-system', '--read-special')
+    assert create_positional_arguments == REPO_ARCHIVE_WITH_PATHS
+    assert not pattern_file
+    assert exclude_file
+
+
+def test_make_base_create_command_with_stream_processes_and_read_special_true_skip_special_files_excludes():
+    flexmock(module).should_receive('deduplicate_directories').and_return(('foo', 'bar'))
+    flexmock(module).should_receive('map_directories_to_devices').and_return({})
+    flexmock(module).should_receive('expand_directories').and_return(())
+    flexmock(module).should_receive('pattern_root_directories').and_return([])
+    flexmock(module.os.path).should_receive('expanduser').and_raise(TypeError)
+    flexmock(module).should_receive('expand_home_directories').and_return(())
+    flexmock(module).should_receive('write_pattern_file').and_return(None)
+    flexmock(module).should_receive('make_list_filter_flags').and_return('FOO')
+    flexmock(module.feature).should_receive('available').and_return(True)
+    flexmock(module).should_receive('ensure_files_readable')
+    flexmock(module).should_receive('make_pattern_flags').and_return(())
+    flexmock(module).should_receive('make_exclude_flags').and_return(())
+    flexmock(module.flags).should_receive('make_repository_archive_flags').and_return(
+        (f'repo::{DEFAULT_ARCHIVE_NAME}',)
+    )
+    flexmock(module.logger).should_receive('warning').never()
+    flexmock(module).should_receive('collect_special_file_paths').never()
+
+    (create_flags, create_positional_arguments, pattern_file, exclude_file) = (
+        module.make_base_create_command(
+            dry_run=False,
+            repository_path='repo',
+            config={
+                'source_directories': ['foo', 'bar'],
+                'repositories': ['repo'],
+                'read_special': True,
+            },
+            config_paths=['/tmp/test.yaml'],
+            local_borg_version='1.2.3',
+            global_arguments=flexmock(log_json=False),
+            borgmatic_source_directories=(),
+            stream_processes=flexmock(),
+        )
+    )
+
     assert create_flags == ('borg', 'create', '--one-file-system', '--read-special')
     assert create_positional_arguments == REPO_ARCHIVE_WITH_PATHS
     assert not pattern_file
@@ -1709,7 +1756,6 @@ def test_create_archive_with_progress_and_stream_processes_calls_borg_with_progr
         )
     )
     flexmock(module.environment).should_receive('make_environment')
-    flexmock(module).should_receive('collect_special_file_paths').and_return(())
     create_command = (
         'borg',
         'create',
@@ -1754,127 +1800,6 @@ def test_create_archive_with_progress_and_stream_processes_calls_borg_with_progr
     )
 
 
-def test_create_archive_with_stream_processes_ands_read_special_false_excludes_special_files():
-    flexmock(module.borgmatic.logger).should_receive('add_custom_log_levels')
-    flexmock(module.logging).ANSWER = module.borgmatic.logger.ANSWER
-    processes = flexmock()
-    flexmock(module).should_receive('expand_directories').and_return(())
-    flexmock(module).should_receive('collect_borgmatic_source_directories').and_return([])
-    flexmock(module).should_receive('make_base_create_command').and_return(
-        (
-            ('borg', 'create', '--one-file-system', '--read-special'),
-            REPO_ARCHIVE_WITH_PATHS,
-            flexmock(),
-            flexmock(),
-        )
-    )
-    flexmock(module.environment).should_receive('make_environment')
-    flexmock(module).should_receive('collect_special_file_paths').and_return(('/dev/null',))
-    flexmock(module).should_receive('expand_home_directories').and_return(())
-    flexmock(module).should_receive('write_pattern_file').and_return(flexmock(name='patterns'))
-    flexmock(module).should_receive('make_exclude_flags').and_return(())
-    create_command = (
-        'borg',
-        'create',
-        '--one-file-system',
-        '--read-special',
-    ) + REPO_ARCHIVE_WITH_PATHS
-    flexmock(module).should_receive('execute_command_with_processes').with_args(
-        create_command + ('--dry-run', '--list'),
-        processes=processes,
-        output_log_level=logging.INFO,
-        output_file=None,
-        borg_local_path='borg',
-        borg_exit_codes=None,
-        working_directory=None,
-        extra_environment=None,
-    )
-    flexmock(module).should_receive('execute_command_with_processes').with_args(
-        create_command,
-        processes=processes,
-        output_log_level=logging.INFO,
-        output_file=None,
-        borg_local_path='borg',
-        borg_exit_codes=None,
-        working_directory=None,
-        extra_environment=None,
-    )
-
-    module.create_archive(
-        dry_run=False,
-        repository_path='repo',
-        config={
-            'source_directories': ['foo', 'bar'],
-            'repositories': ['repo'],
-            'exclude_patterns': None,
-            'read_special': False,
-        },
-        config_paths=['/tmp/test.yaml'],
-        local_borg_version='1.2.3',
-        global_arguments=flexmock(log_json=False),
-        stream_processes=processes,
-    )
-
-
-def test_create_archive_with_stream_processes_and_read_special_true_skips_special_files_excludes():
-    flexmock(module.borgmatic.logger).should_receive('add_custom_log_levels')
-    flexmock(module.logging).ANSWER = module.borgmatic.logger.ANSWER
-    processes = flexmock()
-    flexmock(module).should_receive('expand_directories').and_return(())
-    flexmock(module).should_receive('collect_borgmatic_source_directories').and_return([])
-    flexmock(module).should_receive('make_base_create_command').and_return(
-        (
-            ('borg', 'create', '--one-file-system', '--read-special'),
-            REPO_ARCHIVE_WITH_PATHS,
-            flexmock(),
-            flexmock(),
-        )
-    )
-    flexmock(module.environment).should_receive('make_environment')
-    flexmock(module).should_receive('collect_special_file_paths').never()
-    create_command = (
-        'borg',
-        'create',
-        '--one-file-system',
-        '--read-special',
-    ) + REPO_ARCHIVE_WITH_PATHS
-    flexmock(module).should_receive('execute_command_with_processes').with_args(
-        create_command + ('--dry-run', '--list'),
-        processes=processes,
-        output_log_level=logging.INFO,
-        output_file=None,
-        borg_local_path='borg',
-        borg_exit_codes=None,
-        working_directory=None,
-        extra_environment=None,
-    )
-    flexmock(module).should_receive('execute_command_with_processes').with_args(
-        create_command,
-        processes=processes,
-        output_log_level=logging.INFO,
-        output_file=None,
-        borg_local_path='borg',
-        borg_exit_codes=None,
-        working_directory=None,
-        extra_environment=None,
-    )
-
-    module.create_archive(
-        dry_run=False,
-        repository_path='repo',
-        config={
-            'source_directories': ['foo', 'bar'],
-            'repositories': ['repo'],
-            'exclude_patterns': None,
-            'read_special': True,
-        },
-        config_paths=['/tmp/test.yaml'],
-        local_borg_version='1.2.3',
-        global_arguments=flexmock(log_json=False),
-        stream_processes=processes,
-    )
-
-
 def test_create_archive_with_json_calls_borg_with_json_flag():
     flexmock(module.borgmatic.logger).should_receive('add_custom_log_levels')
     flexmock(module.logging).ANSWER = module.borgmatic.logger.ANSWER

+ 10 - 0
tests/unit/hooks/test_mariadb.py

@@ -44,6 +44,16 @@ def test_database_names_to_dump_queries_mariadb_for_database_names():
     assert names == ('foo', 'bar')
 
 
+def test_use_streaming_true_for_any_databases():
+    assert module.use_streaming(
+        databases=[flexmock(), flexmock()], config=flexmock(), log_prefix=flexmock()
+    )
+
+
+def test_use_streaming_false_for_no_databases():
+    assert not module.use_streaming(databases=[], config=flexmock(), log_prefix=flexmock())
+
+
 def test_dump_data_sources_dumps_each_database():
     databases = [{'name': 'foo'}, {'name': 'bar'}]
     processes = [flexmock(), flexmock()]

+ 20 - 0
tests/unit/hooks/test_mongodb.py

@@ -5,6 +5,26 @@ from flexmock import flexmock
 from borgmatic.hooks import mongodb as module
 
 
+def test_use_streaming_true_for_any_non_directory_format_databases():
+    assert module.use_streaming(
+        databases=[{'format': 'stuff'}, {'format': 'directory'}, {}],
+        config=flexmock(),
+        log_prefix=flexmock(),
+    )
+
+
+def test_use_streaming_false_for_all_directory_format_databases():
+    assert not module.use_streaming(
+        databases=[{'format': 'directory'}, {'format': 'directory'}],
+        config=flexmock(),
+        log_prefix=flexmock(),
+    )
+
+
+def test_use_streaming_false_for_no_databases():
+    assert not module.use_streaming(databases=[], config=flexmock(), log_prefix=flexmock())
+
+
 def test_dump_data_sources_runs_mongodump_for_each_database():
     databases = [{'name': 'foo'}, {'name': 'bar'}]
     processes = [flexmock(), flexmock()]

+ 10 - 0
tests/unit/hooks/test_mysql.py

@@ -44,6 +44,16 @@ def test_database_names_to_dump_queries_mysql_for_database_names():
     assert names == ('foo', 'bar')
 
 
+def test_use_streaming_true_for_any_databases():
+    assert module.use_streaming(
+        databases=[flexmock(), flexmock()], config=flexmock(), log_prefix=flexmock()
+    )
+
+
+def test_use_streaming_false_for_no_databases():
+    assert not module.use_streaming(databases=[], config=flexmock(), log_prefix=flexmock())
+
+
 def test_dump_data_sources_dumps_each_database():
     databases = [{'name': 'foo'}, {'name': 'bar'}]
     processes = [flexmock(), flexmock()]

+ 20 - 0
tests/unit/hooks/test_postgresql.py

@@ -199,6 +199,26 @@ def test_database_names_to_dump_with_all_and_psql_command_uses_custom_command():
     )
 
 
+def test_use_streaming_true_for_any_non_directory_format_databases():
+    assert module.use_streaming(
+        databases=[{'format': 'stuff'}, {'format': 'directory'}, {}],
+        config=flexmock(),
+        log_prefix=flexmock(),
+    )
+
+
+def test_use_streaming_false_for_all_directory_format_databases():
+    assert not module.use_streaming(
+        databases=[{'format': 'directory'}, {'format': 'directory'}],
+        config=flexmock(),
+        log_prefix=flexmock(),
+    )
+
+
+def test_use_streaming_false_for_no_databases():
+    assert not module.use_streaming(databases=[], config=flexmock(), log_prefix=flexmock())
+
+
 def test_dump_data_sources_runs_pg_dump_for_each_database():
     databases = [{'name': 'foo'}, {'name': 'bar'}]
     processes = [flexmock(), flexmock()]

+ 10 - 0
tests/unit/hooks/test_sqlite.py

@@ -5,6 +5,16 @@ from flexmock import flexmock
 from borgmatic.hooks import sqlite as module
 
 
+def test_use_streaming_true_for_any_databases():
+    assert module.use_streaming(
+        databases=[flexmock(), flexmock()], config=flexmock(), log_prefix=flexmock()
+    )
+
+
+def test_use_streaming_false_for_no_databases():
+    assert not module.use_streaming(databases=[], config=flexmock(), log_prefix=flexmock())
+
+
 def test_dump_data_sources_logs_and_skips_if_dump_already_exists():
     databases = [{'path': '/path/to/database', 'name': 'database'}]
 

+ 7 - 0
tests/unit/test_execute.py

@@ -123,6 +123,13 @@ def test_append_last_lines_with_output_log_level_none_appends_captured_output():
         (('foo', 'bar'), None, None, None, 'foo bar'),
         (('foo', 'bar'), flexmock(name='input'), None, None, 'foo bar < input'),
         (('foo', 'bar'), None, flexmock(name='output'), None, 'foo bar > output'),
+        (
+            ('A',) * module.MAX_LOGGED_COMMAND_LENGTH,
+            None,
+            None,
+            None,
+            'A ' * (module.MAX_LOGGED_COMMAND_LENGTH // 2 - 2) + '...',
+        ),
         (
             ('foo', 'bar'),
             flexmock(name='input'),