Переглянути джерело

Stream database dumps and restores directly to/from Borg without using any additional filesystem space (#258).

Dan Helfman 5 роки тому
батько
коміт
a23fdf946d

+ 3 - 0
NEWS

@@ -1,4 +1,7 @@
 1.5.3.dev0
+ * #258: Stream database dumps and restores directly to/from Borg without using any additional
+   filesystem space. This feature is automatic, and works even on restores from archives made with
+   previous versions of borgmatic.
  * #293: Documentation on macOS launchd permissions issues with work-around for Full Disk Access.
 
 1.5.2

+ 20 - 4
borgmatic/borg/create.py

@@ -4,7 +4,11 @@ import logging
 import os
 import tempfile
 
-from borgmatic.execute import execute_command, execute_command_without_capture
+from borgmatic.execute import (
+    execute_command,
+    execute_command_with_processes,
+    execute_command_without_capture,
+)
 
 logger = logging.getLogger(__name__)
 
@@ -125,6 +129,9 @@ def borgmatic_source_directories(borgmatic_source_directory):
     )
 
 
+DEFAULT_ARCHIVE_NAME_FORMAT = '{hostname}-{now:%Y-%m-%dT%H:%M:%S.%f}'
+
+
 def create_archive(
     dry_run,
     repository,
@@ -136,10 +143,14 @@ def create_archive(
     stats=False,
     json=False,
     files=False,
+    stream_processes=None,
 ):
     '''
     Given vebosity/dry-run flags, a local or remote repository path, a location config dict, and a
     storage config dict, create a Borg archive and return Borg's JSON output (if any).
+
+    If a sequence of stream processes is given (instances of subprocess.Popen), then execute the
+    create command while also triggering the given processes to produce output.
     '''
     sources = _expand_directories(
         location_config['source_directories']
@@ -157,8 +168,7 @@ def create_archive(
     umask = storage_config.get('umask', None)
     lock_wait = storage_config.get('lock_wait', None)
     files_cache = location_config.get('files_cache')
-    default_archive_name_format = '{hostname}-{now:%Y-%m-%dT%H:%M:%S.%f}'
-    archive_name_format = storage_config.get('archive_name_format', default_archive_name_format)
+    archive_name_format = storage_config.get('archive_name_format', DEFAULT_ARCHIVE_NAME_FORMAT)
     extra_borg_options = storage_config.get('extra_borg_options', {}).get('create', '')
 
     full_command = (
@@ -174,7 +184,7 @@ def create_archive(
         + (('--noatime',) if location_config.get('atime') is False else ())
         + (('--noctime',) if location_config.get('ctime') is False else ())
         + (('--nobirthtime',) if location_config.get('birthtime') is False else ())
-        + (('--read-special',) if location_config.get('read_special') else ())
+        + (('--read-special',) if (location_config.get('read_special') or stream_processes) else ())
         + (('--nobsdflags',) if location_config.get('bsd_flags') is False else ())
         + (('--files-cache', files_cache) if files_cache else ())
         + (('--remote-path', remote_path) if remote_path else ())
@@ -198,6 +208,7 @@ def create_archive(
 
     # The progress output isn't compatible with captured and logged output, as progress messes with
     # the terminal directly.
+    # FIXME: "--progress" and stream_processes can't be used together.
     if progress:
         execute_command_without_capture(full_command, error_on_warnings=False)
         return
@@ -209,4 +220,9 @@ def create_archive(
     else:
         output_log_level = logging.INFO
 
+    if stream_processes:
+        return execute_command_with_processes(
+            full_command, stream_processes, output_log_level, error_on_warnings=False
+        )
+
     return execute_command(full_command, output_log_level, error_on_warnings=False)

+ 16 - 1
borgmatic/borg/extract.py

@@ -1,5 +1,6 @@
 import logging
 import os
+import subprocess
 
 from borgmatic.execute import execute_command, execute_command_without_capture
 
@@ -63,12 +64,16 @@ def extract_archive(
     destination_path=None,
     progress=False,
     error_on_warnings=True,
+    extract_to_stdout=False,
 ):
     '''
     Given a dry-run flag, a local or remote repository path, an archive name, zero or more paths to
     restore from the archive, location/storage configuration dicts, optional local and remote Borg
     paths, and an optional destination path to extract to, extract the archive into the current
     directory.
+
+    If extract to stdout is True, then start the extraction streaming to stdout, and return that
+    extract process as an instance of subprocess.Popen.
     '''
     umask = storage_config.get('umask', None)
     lock_wait = storage_config.get('lock_wait', None)
@@ -83,6 +88,7 @@ def extract_archive(
         + (('--debug', '--list', '--show-rc') if logger.isEnabledFor(logging.DEBUG) else ())
         + (('--dry-run',) if dry_run else ())
         + (('--progress',) if progress else ())
+        + (('--stdout',) if extract_to_stdout else ())
         + ('::'.join((repository if ':' in repository else os.path.abspath(repository), archive)),)
         + (tuple(paths) if paths else ())
     )
@@ -93,7 +99,16 @@ def extract_archive(
         execute_command_without_capture(
             full_command, working_directory=destination_path, error_on_warnings=error_on_warnings
         )
-        return
+        return None
+
+    if extract_to_stdout:
+        return execute_command(
+            full_command,
+            output_file=subprocess.PIPE,
+            working_directory=destination_path,
+            error_on_warnings=error_on_warnings,
+            run_to_completion=False,
+        )
 
     # Error on warnings by default, as Borg only gives a warning if the restore paths don't exist in
     # the archive!

+ 71 - 57
borgmatic/commands/borgmatic.py

@@ -83,14 +83,6 @@ def run_configuration(config_filename, config, arguments):
                 'pre-backup',
                 global_arguments.dry_run,
             )
-            dispatch.call_hooks(
-                'dump_databases',
-                hooks,
-                config_filename,
-                dump.DATABASE_HOOK_NAMES,
-                location,
-                global_arguments.dry_run,
-            )
         if 'check' in arguments:
             command.execute_hook(
                 hooks.get('before_check'),
@@ -262,6 +254,16 @@ def run_actions(
         )
     if 'create' in arguments:
         logger.info('{}: Creating archive{}'.format(repository, dry_run_label))
+        active_dumps = dispatch.call_hooks(
+            'dump_databases',
+            hooks,
+            repository,
+            dump.DATABASE_HOOK_NAMES,
+            location,
+            global_arguments.dry_run,
+        )
+        stream_processes = [process for processes in active_dumps.values() for process in processes]
+
         json_output = borg_create.create_archive(
             global_arguments.dry_run,
             repository,
@@ -273,9 +275,11 @@ def run_actions(
             stats=arguments['create'].stats,
             json=arguments['create'].json,
             files=arguments['create'].files,
+            stream_processes=stream_processes,
         )
         if json_output:
             yield json.loads(json_output)
+
     if 'check' in arguments and checks.repository_enabled_for_checks(repository, consistency):
         logger.info('{}: Running consistency checks'.format(repository))
         borg_check.check_archives(
@@ -347,57 +351,67 @@ def run_actions(
             if 'all' in restore_names:
                 restore_names = []
 
-            # Extract dumps for the named databases from the archive.
-            dump_patterns = dispatch.call_hooks(
-                'make_database_dump_patterns',
-                hooks,
-                repository,
-                dump.DATABASE_HOOK_NAMES,
-                location,
-                restore_names,
-            )
-
-            borg_extract.extract_archive(
-                global_arguments.dry_run,
-                repository,
-                borg_list.resolve_archive_name(
-                    repository, arguments['restore'].archive, storage, local_path, remote_path
-                ),
-                dump.convert_glob_patterns_to_borg_patterns(
-                    dump.flatten_dump_patterns(dump_patterns, restore_names)
-                ),
-                location,
-                storage,
-                local_path=local_path,
-                remote_path=remote_path,
-                destination_path='/',
-                progress=arguments['restore'].progress,
-                # We don't want glob patterns that don't match to error.
-                error_on_warnings=False,
-            )
-
-            # Map the restore names or detected dumps to the corresponding database configurations.
-            restore_databases = dump.get_per_hook_database_configurations(
-                hooks, restore_names, dump_patterns
+            archive_name = borg_list.resolve_archive_name(
+                repository, arguments['restore'].archive, storage, local_path, remote_path
             )
+            found_names = set()
+
+            for hook_name, per_hook_restore_databases in hooks.items():
+                if hook_name not in dump.DATABASE_HOOK_NAMES:
+                    continue
+
+                for restore_database in per_hook_restore_databases:
+                    database_name = restore_database['name']
+                    if restore_names and database_name not in restore_names:
+                        continue
+
+                    found_names.add(database_name)
+                    dump_pattern = dispatch.call_hooks(
+                        'make_database_dump_pattern',
+                        hooks,
+                        repository,
+                        dump.DATABASE_HOOK_NAMES,
+                        location,
+                        database_name,
+                    )[hook_name]
+
+                    # Kick off a single database extract to stdout.
+                    extract_process = borg_extract.extract_archive(
+                        dry_run=global_arguments.dry_run,
+                        repository=repository,
+                        archive=archive_name,
+                        paths=dump.convert_glob_patterns_to_borg_patterns([dump_pattern]),
+                        location_config=location,
+                        storage_config=storage,
+                        local_path=local_path,
+                        remote_path=remote_path,
+                        destination_path='/',
+                        progress=arguments['restore'].progress,
+                        extract_to_stdout=True,
+                    )
+
+                    # Run a single database restore, consuming the extract stdout.
+                    dispatch.call_hooks(
+                        'restore_database_dump',
+                        {hook_name: [restore_database]},
+                        repository,
+                        dump.DATABASE_HOOK_NAMES,
+                        location,
+                        global_arguments.dry_run,
+                        extract_process,
+                    )
+
+            if not restore_names and not found_names:
+                raise ValueError('No databases were found to restore')
+
+            missing_names = sorted(set(restore_names) - found_names)
+            if missing_names:
+                raise ValueError(
+                    'Cannot restore database(s) {} missing from borgmatic\'s configuration'.format(
+                        ', '.join(missing_names)
+                    )
+                )
 
-            # Finally, restore the databases and cleanup the dumps.
-            dispatch.call_hooks(
-                'restore_database_dumps',
-                restore_databases,
-                repository,
-                dump.DATABASE_HOOK_NAMES,
-                location,
-                global_arguments.dry_run,
-            )
-            dispatch.call_hooks(
-                'remove_database_dumps',
-                restore_databases,
-                repository,
-                dump.DATABASE_HOOK_NAMES,
-                location,
-                global_arguments.dry_run,
-            )
     if 'list' in arguments:
         if arguments['list'].repository is None or validate.repositories_match(
             repository, arguments['list'].repository

+ 168 - 20
borgmatic/execute.py

@@ -1,5 +1,7 @@
+import collections
 import logging
 import os
+import select
 import subprocess
 
 logger = logging.getLogger(__name__)
@@ -20,12 +22,20 @@ def exit_code_indicates_error(exit_code, error_on_warnings=True):
     return bool(exit_code >= BORG_ERROR_EXIT_CODE)
 
 
-def log_output(command, process, output_buffer, output_log_level, error_on_warnings):
+def process_command(process):
     '''
-    Given a command already executed, its process opened by subprocess.Popen(), and the process'
-    relevant output buffer (stderr or stdout), log its output with the requested log level.
-    Additionally, raise a CalledProcessException if the process exits with an error (or a warning,
-    if error on warnings is True).
+    Given a process as an instance of subprocess.Popen, return the command string that was used to
+    invoke it.
+    '''
+    return process.args if isinstance(process.args, str) else ' '.join(process.args)
+
+
+def log_output(process, output_buffer, output_log_level, error_on_warnings):
+    '''
+    Given an executed command's process opened by subprocess.Popen(), and the process' relevant
+    output buffer (stderr or stdout), log its output with the requested log level. Additionally,
+    raise a CalledProcessError if the process exits with an error (or a warning, if error on
+    warnings is True).
     '''
     last_lines = []
 
@@ -54,7 +64,86 @@ def log_output(command, process, output_buffer, output_log_level, error_on_warni
         if len(last_lines) == ERROR_OUTPUT_MAX_LINE_COUNT:
             last_lines.insert(0, '...')
 
-        raise subprocess.CalledProcessError(exit_code, ' '.join(command), '\n'.join(last_lines))
+        raise subprocess.CalledProcessError(
+            exit_code, process_command(process), '\n'.join(last_lines)
+        )
+
+
+def output_buffer_for_process(process, exclude_stdouts):
+    '''
+    Given an instance of subprocess.Popen and a sequence of stdouts to exclude, return either the
+    process's stdout or stderr. The idea is that if stdout is excluded for a process, we still have
+    stderr to log.
+    '''
+    return process.stderr if process.stdout in exclude_stdouts else process.stdout
+
+
+def log_many_outputs(processes, exclude_stdouts, output_log_level, error_on_warnings):
+    '''
+    Given a sequence of subprocess.Popen() instances for multiple processes, log the output for each
+    process with the requested log level. Additionally, raise a CalledProcessError if a process
+    exits with an error (or a warning, if error on warnings is True).
+
+    For simplicity, it's assumed that the output buffer for each process is its stdout. But if any
+    stdouts are given to exclude, then for any matching processes, log from their stderr instead.
+    '''
+    # Map from output buffer to sequence of last lines.
+    buffer_last_lines = collections.defaultdict(list)
+    output_buffers = [output_buffer_for_process(process, exclude_stdouts) for process in processes]
+
+    while True:
+        (ready_buffers, _, _) = select.select(output_buffers, [], [])
+
+        for ready_buffer in ready_buffers:
+            line = ready_buffer.readline().rstrip().decode()
+            if not line:
+                continue
+
+            # Keep the last few lines of output in case the process errors, and we need the output for
+            # the exception below.
+            last_lines = buffer_last_lines[ready_buffer]
+            last_lines.append(line)
+            if len(last_lines) > ERROR_OUTPUT_MAX_LINE_COUNT:
+                last_lines.pop(0)
+
+            logger.log(output_log_level, line)
+
+        if all(process.poll() is not None for process in processes):
+            break
+
+    for process in processes:
+        remaining_output = (
+            output_buffer_for_process(process, exclude_stdouts).read().rstrip().decode()
+        )
+        if remaining_output:  # pragma: no cover
+            logger.log(output_log_level, remaining_output)
+
+    for process in processes:
+        exit_code = process.poll()
+
+        if exit_code_indicates_error(exit_code, error_on_warnings):
+            # If an error occurs, include its output in the raised exception so that we don't
+            # inadvertently hide error output.
+            output_buffer = output_buffer_for_process(process, exclude_stdouts)
+            last_lines = buffer_last_lines[output_buffer]
+            if len(last_lines) == ERROR_OUTPUT_MAX_LINE_COUNT:
+                last_lines.insert(0, '...')
+
+            raise subprocess.CalledProcessError(
+                exit_code, process_command(process), '\n'.join(last_lines)
+            )
+
+
+def log_command(full_command, input_file, output_file):
+    '''
+    Log the given command (a sequence of command/argument strings), along with its input/output file
+    paths.
+    '''
+    logger.debug(
+        ' '.join(full_command)
+        + (' < {}'.format(getattr(input_file, 'name', '')) if input_file else '')
+        + (' > {}'.format(getattr(output_file, 'name', '')) if output_file else '')
+    )
 
 
 def execute_command(
@@ -66,24 +155,23 @@ def execute_command(
     extra_environment=None,
     working_directory=None,
     error_on_warnings=True,
+    run_to_completion=True,
 ):
     '''
     Execute the given command (a sequence of command/argument strings) and log its output at the
-    given log level. If output log level is None, instead capture and return the output. If an
-    open output file object is given, then write stdout to the file and only log stderr (but only
-    if an output log level is set). If an open input file object is given, then read stdin from the
-    file. If shell is True, execute the command within a shell. If an extra environment dict is
-    given, then use it to augment the current environment, and pass the result into the command. If
-    a working directory is given, use that as the present working directory when running the
-    command. If error on warnings is False, then treat exit code 1 as a warning instead of an error.
+    given log level. If output log level is None, instead capture and return the output. (Implies
+    run_to_completion.) If an open output file object is given, then write stdout to the file and
+    only log stderr (but only if an output log level is set). If an open input file object is given,
+    then read stdin from the file. If shell is True, execute the command within a shell. If an extra
+    environment dict is given, then use it to augment the current environment, and pass the result
+    into the command. If a working directory is given, use that as the present working directory
+    when running the command. If error on warnings is False, then treat exit code 1 as a warning
+    instead of an error. If run to completion is False, then return the process for the command
+    without executing it to completion.
 
     Raise subprocesses.CalledProcessError if an error occurs while running the command.
     '''
-    logger.debug(
-        ' '.join(full_command)
-        + (' < {}'.format(input_file.name) if input_file else '')
-        + (' > {}'.format(output_file.name) if output_file else '')
-    )
+    log_command(full_command, input_file, output_file)
     environment = {**os.environ, **extra_environment} if extra_environment else None
 
     if output_log_level is None:
@@ -93,7 +181,7 @@ def execute_command(
         return output.decode() if output is not None else None
     else:
         process = subprocess.Popen(
-            full_command,
+            ' '.join(full_command) if shell else full_command,
             stdin=input_file,
             stdout=output_file or subprocess.PIPE,
             stderr=subprocess.PIPE if output_file else subprocess.STDOUT,
@@ -101,8 +189,10 @@ def execute_command(
             env=environment,
             cwd=working_directory,
         )
+        if not run_to_completion:
+            return process
+
         log_output(
-            full_command,
             process,
             process.stderr if output_file else process.stdout,
             output_log_level,
@@ -126,3 +216,61 @@ def execute_command_without_capture(full_command, working_directory=None, error_
     except subprocess.CalledProcessError as error:
         if exit_code_indicates_error(error.returncode, error_on_warnings):
             raise
+
+
+def execute_command_with_processes(
+    full_command,
+    processes,
+    output_log_level=logging.INFO,
+    output_file=None,
+    input_file=None,
+    shell=False,
+    extra_environment=None,
+    working_directory=None,
+    error_on_warnings=True,
+):
+    '''
+    Execute the given command (a sequence of command/argument strings) and log its output at the
+    given log level. Simultaneously, continue to poll one or more active processes so that they
+    run as well. This is useful, for instance, for processes that are streaming output to a named
+    pipe that the given command is consuming from.
+
+    If an open output file object is given, then write stdout to the file and only log stderr (but
+    only if an output log level is set). If an open input file object is given, then read stdin from
+    the file.  If shell is True, execute the command within a shell. If an extra environment dict is
+    given, then use it to augment the current environment, and pass the result into the command. If
+    a working directory is given, use that as the present working directory when running the
+    command.  If error on warnings is False, then treat exit code 1 as a warning instead of an
+    error.
+
+    Raise subprocesses.CalledProcessError if an error occurs while running the command or in the
+    upstream process.
+    '''
+    log_command(full_command, input_file, output_file)
+    environment = {**os.environ, **extra_environment} if extra_environment else None
+
+    try:
+        command_process = subprocess.Popen(
+            full_command,
+            stdin=input_file,
+            stdout=output_file or subprocess.PIPE,
+            stderr=subprocess.PIPE if output_file else subprocess.STDOUT,
+            shell=shell,
+            env=environment,
+            cwd=working_directory,
+        )
+    except (subprocess.CalledProcessError, OSError):
+        # Something has gone wrong. So vent each process' output buffer to prevent it from hanging.
+        # And then kill the process.
+        for process in processes:
+            if process.poll() is None:
+                process.stdout.read(0)
+                process.kill()
+        raise
+
+    log_many_outputs(
+        tuple(processes) + (command_process,),
+        (input_file, output_file),
+        output_log_level,
+        error_on_warnings,
+    )

+ 5 - 94
borgmatic/hooks/dump.py

@@ -1,4 +1,3 @@
-import glob
 import logging
 import os
 import shutil
@@ -34,24 +33,13 @@ def make_database_dump_filename(dump_path, name, hostname=None):
     return os.path.join(os.path.expanduser(dump_path), hostname or 'localhost', name)
 
 
-def flatten_dump_patterns(dump_patterns, names):
+def create_named_pipe_for_dump(dump_path):
     '''
-    Given a dict from a database hook name to glob patterns matching the dumps for the named
-    databases, flatten out all the glob patterns into a single sequence, and return it.
-
-    Raise ValueError if there are no resulting glob patterns, which indicates that databases are not
-    configured in borgmatic's configuration.
+    Create a named pipe at the given dump path.
     '''
-    flattened = [pattern for patterns in dump_patterns.values() for pattern in patterns]
-
-    if not flattened:
-        raise ValueError(
-            'Cannot restore database(s) {} missing from borgmatic\'s configuration'.format(
-                ', '.join(names) or '"all"'
-            )
-        )
-
-    return flattened
+    os.makedirs(os.path.dirname(dump_path), mode=0o700, exist_ok=True)
+    if not os.path.exists(dump_path):
+        os.mkfifo(dump_path, mode=0o600)
 
 
 def remove_database_dumps(dump_path, databases, database_type_name, log_prefix, dry_run):
@@ -100,80 +88,3 @@ def convert_glob_patterns_to_borg_patterns(patterns):
     patterns like "sh:etc/*".
     '''
     return ['sh:{}'.format(pattern.lstrip(os.path.sep)) for pattern in patterns]
-
-
-def get_database_names_from_dumps(patterns):
-    '''
-    Given a sequence of database dump patterns, find the corresponding database dumps on disk and
-    return the database names from their filenames.
-    '''
-    return [os.path.basename(dump_path) for pattern in patterns for dump_path in glob.glob(pattern)]
-
-
-def get_database_configurations(databases, names):
-    '''
-    Given the full database configuration dicts as per the configuration schema, and a sequence of
-    database names, filter down and yield the configuration for just the named databases.
-    Additionally, if a database configuration is named "all", project out that configuration for
-    each named database.
-    '''
-    named_databases = {database['name']: database for database in databases}
-
-    for name in names:
-        database = named_databases.get(name)
-        if database:
-            yield database
-            continue
-
-        if 'all' in named_databases:
-            yield {**named_databases['all'], **{'name': name}}
-            continue
-
-
-def get_per_hook_database_configurations(hooks, names, dump_patterns):
-    '''
-    Given the hooks configuration dict as per the configuration schema, a sequence of database
-    names to restore, and a dict from database hook name to glob patterns for matching dumps,
-    filter down the configuration for just the named databases.
-
-    If there are no named databases given, then find the corresponding database dumps on disk and
-    use the database names from their filenames. Additionally, if a database configuration is named
-    "all", project out that configuration for each named database.
-
-    Return the results as a dict from database hook name to a sequence of database configuration
-    dicts for that database type.
-
-    Raise ValueError if one of the database names cannot be matched to a database in borgmatic's
-    database configuration.
-    '''
-    hook_databases = {
-        hook_name: list(
-            get_database_configurations(
-                hooks.get(hook_name),
-                names or get_database_names_from_dumps(dump_patterns[hook_name]),
-            )
-        )
-        for hook_name in DATABASE_HOOK_NAMES
-        if hook_name in hooks
-    }
-
-    if not names or 'all' in names:
-        if not any(hook_databases.values()):
-            raise ValueError(
-                'Cannot restore database "all", as there are no database dumps in the archive'
-            )
-
-        return hook_databases
-
-    found_names = {
-        database['name'] for databases in hook_databases.values() for database in databases
-    }
-    missing_names = sorted(set(names) - found_names)
-    if missing_names:
-        raise ValueError(
-            'Cannot restore database(s) {} missing from borgmatic\'s configuration'.format(
-                ', '.join(missing_names)
-            )
-        )
-
-    return hook_databases

+ 58 - 41
borgmatic/hooks/mysql.py

@@ -1,7 +1,6 @@
 import logging
-import os
 
-from borgmatic.execute import execute_command
+from borgmatic.execute import execute_command, execute_command_with_processes
 from borgmatic.hooks import dump
 
 logger = logging.getLogger(__name__)
@@ -55,12 +54,16 @@ def database_names_to_dump(database, extra_environment, log_prefix, dry_run_labe
 
 def dump_databases(databases, log_prefix, location_config, dry_run):
     '''
-    Dump the given MySQL/MariaDB databases to disk. The databases are supplied as a sequence of
-    dicts, one dict describing each database as per the configuration schema. Use the given log
+    Dump the given MySQL/MariaDB databases to a named pipe. The databases are supplied as a sequence
+    of dicts, one dict describing each database as per the configuration schema. Use the given log
     prefix in any log entries. Use the given location configuration dict to construct the
-    destination path. If this is a dry run, then don't actually dump anything.
+    destination path.
+
+    Return a sequence of subprocess.Popen instances for the dump processes ready to spew to a named
+    pipe. But if this is a dry run, then don't actually dump anything and return an empty sequence.
     '''
     dry_run_label = ' (dry run; not actually dumping anything)' if dry_run else ''
+    processes = []
 
     logger.info('{}: Dumping MySQL databases{}'.format(log_prefix, dry_run_label))
 
@@ -75,7 +78,8 @@ def dump_databases(databases, log_prefix, location_config, dry_run):
         )
 
         dump_command = (
-            ('mysqldump', '--add-drop-database')
+            ('mysqldump',)
+            + ('--add-drop-database',)
             + (('--host', database['hostname']) if 'hostname' in database else ())
             + (('--port', str(database['port'])) if 'port' in database else ())
             + (('--protocol', 'tcp') if 'hostname' in database or 'port' in database else ())
@@ -83,6 +87,9 @@ def dump_databases(databases, log_prefix, location_config, dry_run):
             + (tuple(database['options'].split(' ')) if 'options' in database else ())
             + ('--databases',)
             + dump_command_names
+            # Use shell redirection rather than execute_command(output_file=open(...)) to prevent
+            # the open() call on a named pipe from hanging the main borgmatic process.
+            + ('>', dump_filename)
         )
 
         logger.debug(
@@ -90,13 +97,21 @@ def dump_databases(databases, log_prefix, location_config, dry_run):
                 log_prefix, requested_name, dump_filename, dry_run_label
             )
         )
-        if not dry_run:
-            os.makedirs(os.path.dirname(dump_filename), mode=0o700, exist_ok=True)
+        if dry_run:
+            continue
+
+        dump.create_named_pipe_for_dump(dump_filename)
+
+        processes.append(
             execute_command(
                 dump_command,
-                output_file=open(dump_filename, 'w'),
+                shell=True,
                 extra_environment=extra_environment,
+                run_to_completion=False,
             )
+        )
+
+    return processes
 
 
 def remove_database_dumps(databases, log_prefix, location_config, dry_run):  # pragma: no cover
@@ -111,45 +126,47 @@ def remove_database_dumps(databases, log_prefix, location_config, dry_run):  # p
     )
 
 
-def make_database_dump_patterns(databases, log_prefix, location_config, names):
+def make_database_dump_pattern(databases, log_prefix, location_config, name=None):
     '''
     Given a sequence of configurations dicts, a prefix to log with, a location configuration dict,
-    and a sequence of database names to match, return the corresponding glob patterns to match the
-    database dumps in an archive. An empty sequence of names indicates that the patterns should
-    match all dumps.
+    and a database name to match, return the corresponding glob patterns to match the database dump
+    in an archive.
     '''
-    return [
-        dump.make_database_dump_filename(make_dump_path(location_config), name, hostname='*')
-        for name in (names or ['*'])
-    ]
+    return dump.make_database_dump_filename(make_dump_path(location_config), name, hostname='*')
 
 
-def restore_database_dumps(databases, log_prefix, location_config, dry_run):
+def restore_database_dump(database_config, log_prefix, location_config, dry_run, extract_process):
     '''
-    Restore the given MySQL/MariaDB databases from disk. The databases are supplied as a sequence of
-    dicts, one dict describing each database as per the configuration schema. Use the given log
-    prefix in any log entries. Use the given location configuration dict to construct the
-    destination path. If this is a dry run, then don't actually restore anything.
+    Restore the given MySQL/MariaDB database from an extract stream. The database is supplied as a
+    one-element sequence containing a dict describing the database, as per the configuration schema.
+    Use the given log prefix in any log entries. If this is a dry run, then don't actually restore
+    anything. Trigger the given active extract process (an instance of subprocess.Popen) to produce
+    output to consume.
     '''
     dry_run_label = ' (dry run; not actually restoring anything)' if dry_run else ''
 
-    for database in databases:
-        dump_filename = dump.make_database_dump_filename(
-            make_dump_path(location_config), database['name'], database.get('hostname')
-        )
-        restore_command = (
-            ('mysql', '--batch')
-            + (('--host', database['hostname']) if 'hostname' in database else ())
-            + (('--port', str(database['port'])) if 'port' in database else ())
-            + (('--protocol', 'tcp') if 'hostname' in database or 'port' in database else ())
-            + (('--user', database['username']) if 'username' in database else ())
-        )
-        extra_environment = {'MYSQL_PWD': database['password']} if 'password' in database else None
+    if len(database_config) != 1:
+        raise ValueError('The database configuration value is invalid')
 
-        logger.debug(
-            '{}: Restoring MySQL database {}{}'.format(log_prefix, database['name'], dry_run_label)
-        )
-        if not dry_run:
-            execute_command(
-                restore_command, input_file=open(dump_filename), extra_environment=extra_environment
-            )
+    database = database_config[0]
+    restore_command = (
+        ('mysql', '--batch')
+        + (('--host', database['hostname']) if 'hostname' in database else ())
+        + (('--port', str(database['port'])) if 'port' in database else ())
+        + (('--protocol', 'tcp') if 'hostname' in database or 'port' in database else ())
+        + (('--user', database['username']) if 'username' in database else ())
+    )
+    extra_environment = {'MYSQL_PWD': database['password']} if 'password' in database else None
+
+    logger.debug(
+        '{}: Restoring MySQL database {}{}'.format(log_prefix, database['name'], dry_run_label)
+    )
+    if dry_run:
+        return
+
+    execute_command_with_processes(
+        restore_command,
+        [extract_process],
+        input_file=extract_process.stdout,
+        extra_environment=extra_environment,
+    )

+ 67 - 55
borgmatic/hooks/postgresql.py

@@ -1,7 +1,6 @@
 import logging
-import os
 
-from borgmatic.execute import execute_command
+from borgmatic.execute import execute_command, execute_command_with_processes
 from borgmatic.hooks import dump
 
 logger = logging.getLogger(__name__)
@@ -18,12 +17,16 @@ def make_dump_path(location_config):  # pragma: no cover
 
 def dump_databases(databases, log_prefix, location_config, dry_run):
     '''
-    Dump the given PostgreSQL databases to disk. The databases are supplied as a sequence of dicts,
-    one dict describing each database as per the configuration schema. Use the given log prefix in
-    any log entries. Use the given location configuration dict to construct the destination path. If
-    this is a dry run, then don't actually dump anything.
+    Dump the given PostgreSQL databases to a named pipe. The databases are supplied as a sequence of
+    dicts, one dict describing each database as per the configuration schema. Use the given log
+    prefix in any log entries. Use the given location configuration dict to construct the
+    destination path.
+
+    Return a sequence of subprocess.Popen instances for the dump processes ready to spew to a named
+    pipe. But if this is a dry run, then don't actually dump anything and return an empty sequence.
     '''
     dry_run_label = ' (dry run; not actually dumping anything)' if dry_run else ''
+    processes = []
 
     logger.info('{}: Dumping PostgreSQL databases{}'.format(log_prefix, dry_run_label))
 
@@ -39,6 +42,7 @@ def dump_databases(databases, log_prefix, location_config, dry_run):
                 '--no-password',
                 '--clean',
                 '--if-exists',
+                '--no-sync',
             )
             + ('--file', dump_filename)
             + (('--host', database['hostname']) if 'hostname' in database else ())
@@ -55,9 +59,16 @@ def dump_databases(databases, log_prefix, location_config, dry_run):
                 log_prefix, name, dump_filename, dry_run_label
             )
         )
-        if not dry_run:
-            os.makedirs(os.path.dirname(dump_filename), mode=0o700, exist_ok=True)
-            execute_command(command, extra_environment=extra_environment)
+        if dry_run:
+            continue
+
+        dump.create_named_pipe_for_dump(dump_filename)
+
+        processes.append(
+            execute_command(command, extra_environment=extra_environment, run_to_completion=False)
+        )
+
+    return processes
 
 
 def remove_database_dumps(databases, log_prefix, location_config, dry_run):  # pragma: no cover
@@ -72,60 +83,61 @@ def remove_database_dumps(databases, log_prefix, location_config, dry_run):  # p
     )
 
 
-def make_database_dump_patterns(databases, log_prefix, location_config, names):
+def make_database_dump_pattern(databases, log_prefix, location_config, name=None):
     '''
     Given a sequence of configurations dicts, a prefix to log with, a location configuration dict,
-    and a sequence of database names to match, return the corresponding glob patterns to match the
-    database dumps in an archive. An empty sequence of names indicates that the patterns should
-    match all dumps.
+    and a database name to match, return the corresponding glob patterns to match the database dump
+    in an archive.
     '''
-    return [
-        dump.make_database_dump_filename(make_dump_path(location_config), name, hostname='*')
-        for name in (names or ['*'])
-    ]
+    return dump.make_database_dump_filename(make_dump_path(location_config), name, hostname='*')
 
 
-def restore_database_dumps(databases, log_prefix, location_config, dry_run):
+def restore_database_dump(database_config, log_prefix, location_config, dry_run, extract_process):
     '''
-    Restore the given PostgreSQL databases from disk. The databases are supplied as a sequence of
-    dicts, one dict describing each database as per the configuration schema. Use the given log
-    prefix in any log entries. Use the given location configuration dict to construct the
-    destination path. If this is a dry run, then don't actually restore anything.
+    Restore the given PostgreSQL database from an extract stream. The database is supplied as a
+    one-element sequence containing a dict describing the database, as per the configuration schema.
+    Use the given log prefix in any log entries. If this is a dry run, then don't actually restore
+    anything. Trigger the given active extract process (an instance of subprocess.Popen) to produce
+    output to consume.
     '''
     dry_run_label = ' (dry run; not actually restoring anything)' if dry_run else ''
 
-    for database in databases:
-        dump_filename = dump.make_database_dump_filename(
-            make_dump_path(location_config), database['name'], database.get('hostname')
-        )
-        all_databases = bool(database['name'] == 'all')
-        analyze_command = (
-            ('psql', '--no-password', '--quiet')
-            + (('--host', database['hostname']) if 'hostname' in database else ())
-            + (('--port', str(database['port'])) if 'port' in database else ())
-            + (('--username', database['username']) if 'username' in database else ())
-            + (('--dbname', database['name']) if not all_databases else ())
-            + ('--command', 'ANALYZE')
-        )
-        restore_command = (
-            ('psql' if all_databases else 'pg_restore', '--no-password')
-            + (
-                ('--if-exists', '--exit-on-error', '--clean', '--dbname', database['name'])
-                if not all_databases
-                else ()
-            )
-            + (('--host', database['hostname']) if 'hostname' in database else ())
-            + (('--port', str(database['port'])) if 'port' in database else ())
-            + (('--username', database['username']) if 'username' in database else ())
-            + (('-f', dump_filename) if all_databases else (dump_filename,))
+    if len(database_config) != 1:
+        raise ValueError('The database configuration value is invalid')
+
+    database = database_config[0]
+    all_databases = bool(database['name'] == 'all')
+    analyze_command = (
+        ('psql', '--no-password', '--quiet')
+        + (('--host', database['hostname']) if 'hostname' in database else ())
+        + (('--port', str(database['port'])) if 'port' in database else ())
+        + (('--username', database['username']) if 'username' in database else ())
+        + (('--dbname', database['name']) if not all_databases else ())
+        + ('--command', 'ANALYZE')
+    )
+    restore_command = (
+        ('psql' if all_databases else 'pg_restore', '--no-password')
+        + (
+            ('--if-exists', '--exit-on-error', '--clean', '--dbname', database['name'])
+            if not all_databases
+            else ()
         )
-        extra_environment = {'PGPASSWORD': database['password']} if 'password' in database else None
+        + (('--host', database['hostname']) if 'hostname' in database else ())
+        + (('--port', str(database['port'])) if 'port' in database else ())
+        + (('--username', database['username']) if 'username' in database else ())
+    )
+    extra_environment = {'PGPASSWORD': database['password']} if 'password' in database else None
 
-        logger.debug(
-            '{}: Restoring PostgreSQL database {}{}'.format(
-                log_prefix, database['name'], dry_run_label
-            )
-        )
-        if not dry_run:
-            execute_command(restore_command, extra_environment=extra_environment)
-            execute_command(analyze_command, extra_environment=extra_environment)
+    logger.debug(
+        '{}: Restoring PostgreSQL database {}{}'.format(log_prefix, database['name'], dry_run_label)
+    )
+    if dry_run:
+        return
+
+    execute_command_with_processes(
+        restore_command,
+        [extract_process],
+        input_file=extract_process.stdout,
+        extra_environment=extra_environment,
+    )
+    execute_command(analyze_command, extra_environment=extra_environment)

+ 6 - 6
docs/how-to/backup-your-databases.md

@@ -22,13 +22,13 @@ hooks:
         - name: posts
 ```
 
-Prior to each backup, borgmatic dumps each configured database to a file
-and includes it in the backup. After the backup completes, borgmatic removes
-the database dump files to recover disk space.
+As part of each backup, borgmatic streams a database dump for each configured
+database directly to Borg, so it's included in the backup without consuming
+additional disk space.
 
-borgmatic creates these temporary dump files in `~/.borgmatic` by default. To
-customize this path, set the `borgmatic_source_directory` option in the
-`location` section of borgmatic's configuration.
+To support this, borgmatic creates temporary named pipes in `~/.borgmatic` by
+default. To customize this path, set the `borgmatic_source_directory` option
+in the `location` section of borgmatic's configuration.
 
 Here's a more involved example that connects to remote databases:
 

+ 5 - 21
tests/integration/test_execute.py

@@ -14,20 +14,12 @@ def test_log_output_logs_each_line_separately():
 
     hi_process = subprocess.Popen(['echo', 'hi'], stdout=subprocess.PIPE)
     module.log_output(
-        ['echo', 'hi'],
-        hi_process,
-        hi_process.stdout,
-        output_log_level=logging.INFO,
-        error_on_warnings=False,
+        hi_process, hi_process.stdout, output_log_level=logging.INFO, error_on_warnings=False
     )
 
     there_process = subprocess.Popen(['echo', 'there'], stdout=subprocess.PIPE)
     module.log_output(
-        ['echo', 'there'],
-        there_process,
-        there_process.stdout,
-        output_log_level=logging.INFO,
-        error_on_warnings=False,
+        there_process, there_process.stdout, output_log_level=logging.INFO, error_on_warnings=False
     )
 
 
@@ -39,11 +31,7 @@ def test_log_output_includes_error_output_in_exception():
 
     with pytest.raises(subprocess.CalledProcessError) as error:
         module.log_output(
-            ['grep'],
-            process,
-            process.stdout,
-            output_log_level=logging.INFO,
-            error_on_warnings=False,
+            process, process.stdout, output_log_level=logging.INFO, error_on_warnings=False
         )
 
     assert error.value.returncode == 2
@@ -59,11 +47,7 @@ def test_log_output_truncates_long_error_output():
 
     with pytest.raises(subprocess.CalledProcessError) as error:
         module.log_output(
-            ['grep'],
-            process,
-            process.stdout,
-            output_log_level=logging.INFO,
-            error_on_warnings=False,
+            process, process.stdout, output_log_level=logging.INFO, error_on_warnings=False
         )
 
     assert error.value.returncode == 2
@@ -76,5 +60,5 @@ def test_log_output_with_no_output_logs_nothing():
 
     process = subprocess.Popen(['true'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
     module.log_output(
-        ['true'], process, process.stdout, output_log_level=logging.INFO, error_on_warnings=False
+        process, process.stdout, output_log_level=logging.INFO, error_on_warnings=False
     )

+ 0 - 113
tests/unit/hooks/test_dump.py

@@ -34,29 +34,6 @@ def test_make_database_dump_filename_with_invalid_name_raises():
         module.make_database_dump_filename('databases', 'invalid/name')
 
 
-def test_flatten_dump_patterns_produces_list_of_all_patterns():
-    dump_patterns = {'postgresql_databases': ['*/glob', 'glob/*'], 'mysql_databases': ['*/*/*']}
-    expected_patterns = sorted(
-        dump_patterns['postgresql_databases'] + dump_patterns['mysql_databases']
-    )
-
-    assert sorted(module.flatten_dump_patterns(dump_patterns, ('bob',))) == expected_patterns
-
-
-def test_flatten_dump_patterns_with_no_patterns_errors():
-    dump_patterns = {'postgresql_databases': [], 'mysql_databases': []}
-
-    with pytest.raises(ValueError):
-        assert module.flatten_dump_patterns(dump_patterns, ('bob',))
-
-
-def test_flatten_dump_patterns_with_no_hooks_errors():
-    dump_patterns = {}
-
-    with pytest.raises(ValueError):
-        assert module.flatten_dump_patterns(dump_patterns, ('bob',))
-
-
 def test_remove_database_dumps_removes_dump_for_each_database():
     databases = [{'name': 'foo'}, {'name': 'bar'}]
     flexmock(module).should_receive('make_database_dump_filename').with_args(
@@ -107,93 +84,3 @@ def test_remove_database_dumps_without_databases_does_not_raise():
 
 def test_convert_glob_patterns_to_borg_patterns_removes_leading_slash():
     assert module.convert_glob_patterns_to_borg_patterns(('/etc/foo/bar',)) == ['sh:etc/foo/bar']
-
-
-def test_get_database_names_from_dumps_gets_names_from_filenames_matching_globs():
-    flexmock(module.glob).should_receive('glob').and_return(
-        ('databases/localhost/foo',)
-    ).and_return(('databases/localhost/bar',)).and_return(())
-
-    assert module.get_database_names_from_dumps(
-        ('databases/*/foo', 'databases/*/bar', 'databases/*/baz')
-    ) == ['foo', 'bar']
-
-
-def test_get_database_configurations_only_produces_named_databases():
-    databases = [
-        {'name': 'foo', 'hostname': 'example.org'},
-        {'name': 'bar', 'hostname': 'example.com'},
-        {'name': 'baz', 'hostname': 'example.org'},
-    ]
-
-    assert list(module.get_database_configurations(databases, ('foo', 'baz'))) == [
-        {'name': 'foo', 'hostname': 'example.org'},
-        {'name': 'baz', 'hostname': 'example.org'},
-    ]
-
-
-def test_get_database_configurations_matches_all_database():
-    databases = [
-        {'name': 'foo', 'hostname': 'example.org'},
-        {'name': 'all', 'hostname': 'example.com'},
-    ]
-
-    assert list(module.get_database_configurations(databases, ('foo', 'bar', 'baz'))) == [
-        {'name': 'foo', 'hostname': 'example.org'},
-        {'name': 'bar', 'hostname': 'example.com'},
-        {'name': 'baz', 'hostname': 'example.com'},
-    ]
-
-
-def test_get_per_hook_database_configurations_partitions_by_hook():
-    hooks = {'postgresql_databases': [flexmock()]}
-    names = ('foo', 'bar')
-    dump_patterns = flexmock()
-    expected_config = {'postgresql_databases': [{'name': 'foo'}, {'name': 'bar'}]}
-    flexmock(module).should_receive('get_database_configurations').with_args(
-        hooks['postgresql_databases'], names
-    ).and_return(expected_config['postgresql_databases'])
-
-    config = module.get_per_hook_database_configurations(hooks, names, dump_patterns)
-
-    assert config == expected_config
-
-
-def test_get_per_hook_database_configurations_defaults_to_detected_database_names():
-    hooks = {'postgresql_databases': [flexmock()]}
-    names = ()
-    detected_names = flexmock()
-    dump_patterns = {'postgresql_databases': [flexmock()]}
-    expected_config = {'postgresql_databases': [flexmock()]}
-    flexmock(module).should_receive('get_database_names_from_dumps').and_return(detected_names)
-    flexmock(module).should_receive('get_database_configurations').with_args(
-        hooks['postgresql_databases'], detected_names
-    ).and_return(expected_config['postgresql_databases'])
-
-    config = module.get_per_hook_database_configurations(hooks, names, dump_patterns)
-
-    assert config == expected_config
-
-
-def test_get_per_hook_database_configurations_with_unknown_database_name_raises():
-    hooks = {'postgresql_databases': [flexmock()]}
-    names = ('foo', 'bar')
-    dump_patterns = flexmock()
-    flexmock(module).should_receive('get_database_configurations').with_args(
-        hooks['postgresql_databases'], names
-    ).and_return([])
-
-    with pytest.raises(ValueError):
-        module.get_per_hook_database_configurations(hooks, names, dump_patterns)
-
-
-def test_get_per_hook_database_configurations_with_all_and_no_archive_dumps_raises():
-    hooks = {'postgresql_databases': [flexmock()]}
-    names = ('foo', 'all')
-    dump_patterns = flexmock()
-    flexmock(module).should_receive('get_database_configurations').with_args(
-        hooks['postgresql_databases'], names
-    ).and_return([])
-
-    with pytest.raises(ValueError):
-        module.get_per_hook_database_configurations(hooks, names, dump_patterns)

+ 109 - 99
tests/unit/hooks/test_mysql.py

@@ -1,5 +1,4 @@
-import sys
-
+import pytest
 from flexmock import flexmock
 
 from borgmatic.hooks import mysql as module
@@ -36,7 +35,7 @@ def test_database_names_to_dump_queries_mysql_for_database_names():
 
 def test_dump_databases_runs_mysqldump_for_each_database():
     databases = [{'name': 'foo'}, {'name': 'bar'}]
-    output_file = flexmock()
+    processes = [flexmock(), flexmock()]
     flexmock(module).should_receive('make_dump_path').and_return('')
     flexmock(module.dump).should_receive('make_database_dump_filename').and_return(
         'databases/localhost/foo'
@@ -44,17 +43,24 @@ def test_dump_databases_runs_mysqldump_for_each_database():
     flexmock(module).should_receive('database_names_to_dump').and_return(('foo',)).and_return(
         ('bar',)
     )
-    flexmock(module.os).should_receive('makedirs')
-    flexmock(sys.modules['builtins']).should_receive('open').and_return(output_file)
+    flexmock(module.dump).should_receive('create_named_pipe_for_dump')
 
-    for name in ('foo', 'bar'):
+    for name, process in zip(('foo', 'bar'), processes):
         flexmock(module).should_receive('execute_command').with_args(
-            ('mysqldump', '--add-drop-database', '--databases', name),
-            output_file=output_file,
+            (
+                'mysqldump',
+                '--add-drop-database',
+                '--databases',
+                name,
+                '>',
+                'databases/localhost/{}'.format(name),
+            ),
+            shell=True,
             extra_environment=None,
-        ).once()
+            run_to_completion=False,
+        ).and_return(process).once()
 
-    module.dump_databases(databases, 'test.yaml', {}, dry_run=False)
+    assert module.dump_databases(databases, 'test.yaml', {}, dry_run=False) == processes
 
 
 def test_dump_databases_with_dry_run_skips_mysqldump():
@@ -66,7 +72,7 @@ def test_dump_databases_with_dry_run_skips_mysqldump():
     flexmock(module).should_receive('database_names_to_dump').and_return(('foo',)).and_return(
         ('bar',)
     )
-    flexmock(module.os).should_receive('makedirs').never()
+    flexmock(module.dump).should_receive('create_named_pipe_for_dump').never()
     flexmock(module).should_receive('execute_command').never()
 
     module.dump_databases(databases, 'test.yaml', {}, dry_run=True)
@@ -74,14 +80,13 @@ def test_dump_databases_with_dry_run_skips_mysqldump():
 
 def test_dump_databases_runs_mysqldump_with_hostname_and_port():
     databases = [{'name': 'foo', 'hostname': 'database.example.org', 'port': 5433}]
-    output_file = flexmock()
+    process = flexmock()
     flexmock(module).should_receive('make_dump_path').and_return('')
     flexmock(module.dump).should_receive('make_database_dump_filename').and_return(
         'databases/database.example.org/foo'
     )
     flexmock(module).should_receive('database_names_to_dump').and_return(('foo',))
-    flexmock(module.os).should_receive('makedirs')
-    flexmock(sys.modules['builtins']).should_receive('open').and_return(output_file)
+    flexmock(module.dump).should_receive('create_named_pipe_for_dump')
 
     flexmock(module).should_receive('execute_command').with_args(
         (
@@ -95,128 +100,135 @@ def test_dump_databases_runs_mysqldump_with_hostname_and_port():
             'tcp',
             '--databases',
             'foo',
+            '>',
+            'databases/database.example.org/foo',
         ),
-        output_file=output_file,
+        shell=True,
         extra_environment=None,
-    ).once()
+        run_to_completion=False,
+    ).and_return(process).once()
 
-    module.dump_databases(databases, 'test.yaml', {}, dry_run=False)
+    assert module.dump_databases(databases, 'test.yaml', {}, dry_run=False) == [process]
 
 
 def test_dump_databases_runs_mysqldump_with_username_and_password():
     databases = [{'name': 'foo', 'username': 'root', 'password': 'trustsome1'}]
-    output_file = flexmock()
+    process = flexmock()
     flexmock(module).should_receive('make_dump_path').and_return('')
     flexmock(module.dump).should_receive('make_database_dump_filename').and_return(
         'databases/localhost/foo'
     )
     flexmock(module).should_receive('database_names_to_dump').and_return(('foo',))
-    flexmock(module.os).should_receive('makedirs')
-    flexmock(sys.modules['builtins']).should_receive('open').and_return(output_file)
+    flexmock(module.dump).should_receive('create_named_pipe_for_dump')
 
     flexmock(module).should_receive('execute_command').with_args(
-        ('mysqldump', '--add-drop-database', '--user', 'root', '--databases', 'foo'),
-        output_file=output_file,
+        (
+            'mysqldump',
+            '--add-drop-database',
+            '--user',
+            'root',
+            '--databases',
+            'foo',
+            '>',
+            'databases/localhost/foo',
+        ),
+        shell=True,
         extra_environment={'MYSQL_PWD': 'trustsome1'},
-    ).once()
+        run_to_completion=False,
+    ).and_return(process).once()
 
-    module.dump_databases(databases, 'test.yaml', {}, dry_run=False)
+    assert module.dump_databases(databases, 'test.yaml', {}, dry_run=False) == [process]
 
 
 def test_dump_databases_runs_mysqldump_with_options():
     databases = [{'name': 'foo', 'options': '--stuff=such'}]
-    output_file = flexmock()
+    process = flexmock()
     flexmock(module).should_receive('make_dump_path').and_return('')
     flexmock(module.dump).should_receive('make_database_dump_filename').and_return(
         'databases/localhost/foo'
     )
     flexmock(module).should_receive('database_names_to_dump').and_return(('foo',))
-    flexmock(module.os).should_receive('makedirs')
-    flexmock(sys.modules['builtins']).should_receive('open').and_return(output_file)
+    flexmock(module.dump).should_receive('create_named_pipe_for_dump')
 
     flexmock(module).should_receive('execute_command').with_args(
-        ('mysqldump', '--add-drop-database', '--stuff=such', '--databases', 'foo'),
-        output_file=output_file,
+        (
+            'mysqldump',
+            '--add-drop-database',
+            '--stuff=such',
+            '--databases',
+            'foo',
+            '>',
+            'databases/localhost/foo',
+        ),
+        shell=True,
         extra_environment=None,
-    ).once()
+        run_to_completion=False,
+    ).and_return(process).once()
 
-    module.dump_databases(databases, 'test.yaml', {}, dry_run=False)
+    assert module.dump_databases(databases, 'test.yaml', {}, dry_run=False) == [process]
 
 
 def test_dump_databases_runs_mysqldump_for_all_databases():
     databases = [{'name': 'all'}]
-    output_file = flexmock()
+    process = flexmock()
     flexmock(module).should_receive('make_dump_path').and_return('')
     flexmock(module.dump).should_receive('make_database_dump_filename').and_return(
         'databases/localhost/all'
     )
     flexmock(module).should_receive('database_names_to_dump').and_return(('foo', 'bar'))
-    flexmock(module.os).should_receive('makedirs')
-    flexmock(sys.modules['builtins']).should_receive('open').and_return(output_file)
+    flexmock(module.dump).should_receive('create_named_pipe_for_dump')
 
     flexmock(module).should_receive('execute_command').with_args(
-        ('mysqldump', '--add-drop-database', '--databases', 'foo', 'bar'),
-        output_file=output_file,
+        (
+            'mysqldump',
+            '--add-drop-database',
+            '--databases',
+            'foo',
+            'bar',
+            '>',
+            'databases/localhost/all',
+        ),
+        shell=True,
         extra_environment=None,
-    ).once()
-
-    module.dump_databases(databases, 'test.yaml', {}, dry_run=False)
+        run_to_completion=False,
+    ).and_return(process).once()
 
+    assert module.dump_databases(databases, 'test.yaml', {}, dry_run=False) == [process]
 
-def test_make_database_dump_patterns_converts_names_to_glob_paths():
-    flexmock(module).should_receive('make_dump_path').and_return('')
-    flexmock(module.dump).should_receive('make_database_dump_filename').and_return(
-        'databases/*/foo'
-    ).and_return('databases/*/bar')
-
-    assert module.make_database_dump_patterns(flexmock(), flexmock(), {}, ('foo', 'bar')) == [
-        'databases/*/foo',
-        'databases/*/bar',
-    ]
 
+def test_restore_database_dump_runs_mysql_to_restore():
+    database_config = [{'name': 'foo'}]
+    extract_process = flexmock(stdout=flexmock())
 
-def test_make_database_dump_patterns_treats_empty_names_as_matching_all_databases():
-    flexmock(module).should_receive('make_dump_path').and_return('/dump/path')
-    flexmock(module.dump).should_receive('make_database_dump_filename').with_args(
-        '/dump/path', '*', '*'
-    ).and_return('databases/*/*')
+    flexmock(module).should_receive('execute_command_with_processes').with_args(
+        ('mysql', '--batch'),
+        processes=[extract_process],
+        input_file=extract_process.stdout,
+        extra_environment=None,
+    ).once()
 
-    assert module.make_database_dump_patterns(flexmock(), flexmock(), {}, ()) == ['databases/*/*']
+    module.restore_database_dump(
+        database_config, 'test.yaml', {}, dry_run=False, extract_process=extract_process
+    )
 
 
-def test_restore_database_dumps_restores_each_database():
-    databases = [{'name': 'foo'}, {'name': 'bar'}]
-    flexmock(module).should_receive('make_dump_path').and_return('')
-    flexmock(module.dump).should_receive('make_database_dump_filename').and_return(
-        'databases/localhost/foo'
-    ).and_return('databases/localhost/bar')
+def test_restore_database_dump_errors_on_multiple_database_config():
+    database_config = [{'name': 'foo'}, {'name': 'bar'}]
 
-    for name in ('foo', 'bar'):
-        dump_filename = 'databases/localhost/{}'.format(name)
-        input_file = flexmock()
-        flexmock(sys.modules['builtins']).should_receive('open').with_args(
-            dump_filename
-        ).and_return(input_file)
-        flexmock(module).should_receive('execute_command').with_args(
-            ('mysql', '--batch'), input_file=input_file, extra_environment=None
-        ).once()
+    flexmock(module).should_receive('execute_command_with_processes').never()
+    flexmock(module).should_receive('execute_command').never()
 
-    module.restore_database_dumps(databases, 'test.yaml', {}, dry_run=False)
+    with pytest.raises(ValueError):
+        module.restore_database_dump(
+            database_config, 'test.yaml', {}, dry_run=False, extract_process=flexmock()
+        )
 
 
-def test_restore_database_dumps_runs_mysql_with_hostname_and_port():
-    databases = [{'name': 'foo', 'hostname': 'database.example.org', 'port': 5433}]
-    flexmock(module).should_receive('make_dump_path').and_return('')
-    flexmock(module.dump).should_receive('make_database_dump_filename').and_return(
-        'databases/localhost/foo'
-    )
-    dump_filename = 'databases/localhost/foo'
-    input_file = flexmock()
-    flexmock(sys.modules['builtins']).should_receive('open').with_args(dump_filename).and_return(
-        input_file
-    )
+def test_restore_database_dump_runs_mysql_with_hostname_and_port():
+    database_config = [{'name': 'foo', 'hostname': 'database.example.org', 'port': 5433}]
+    extract_process = flexmock(stdout=flexmock())
 
-    flexmock(module).should_receive('execute_command').with_args(
+    flexmock(module).should_receive('execute_command_with_processes').with_args(
         (
             'mysql',
             '--batch',
@@ -227,29 +239,27 @@ def test_restore_database_dumps_runs_mysql_with_hostname_and_port():
             '--protocol',
             'tcp',
         ),
-        input_file=input_file,
+        processes=[extract_process],
+        input_file=extract_process.stdout,
         extra_environment=None,
     ).once()
 
-    module.restore_database_dumps(databases, 'test.yaml', {}, dry_run=False)
+    module.restore_database_dump(
+        database_config, 'test.yaml', {}, dry_run=False, extract_process=extract_process
+    )
 
 
-def test_restore_database_dumps_runs_mysql_with_username_and_password():
-    databases = [{'name': 'foo', 'username': 'root', 'password': 'trustsome1'}]
-    flexmock(module).should_receive('make_dump_path').and_return('')
-    flexmock(module.dump).should_receive('make_database_dump_filename').and_return(
-        'databases/localhost/foo'
-    )
-    dump_filename = 'databases/localhost/foo'
-    input_file = flexmock()
-    flexmock(sys.modules['builtins']).should_receive('open').with_args(dump_filename).and_return(
-        input_file
-    )
+def test_restore_database_dump_runs_mysql_with_username_and_password():
+    database_config = [{'name': 'foo', 'username': 'root', 'password': 'trustsome1'}]
+    extract_process = flexmock(stdout=flexmock())
 
-    flexmock(module).should_receive('execute_command').with_args(
+    flexmock(module).should_receive('execute_command_with_processes').with_args(
         ('mysql', '--batch', '--user', 'root'),
-        input_file=input_file,
+        processes=[extract_process],
+        input_file=extract_process.stdout,
         extra_environment={'MYSQL_PWD': 'trustsome1'},
     ).once()
 
-    module.restore_database_dumps(databases, 'test.yaml', {}, dry_run=False)
+    module.restore_database_dump(
+        database_config, 'test.yaml', {}, dry_run=False, extract_process=extract_process
+    )

+ 101 - 90
tests/unit/hooks/test_postgresql.py

@@ -1,3 +1,4 @@
+import pytest
 from flexmock import flexmock
 
 from borgmatic.hooks import postgresql as module
@@ -5,19 +6,21 @@ from borgmatic.hooks import postgresql as module
 
 def test_dump_databases_runs_pg_dump_for_each_database():
     databases = [{'name': 'foo'}, {'name': 'bar'}]
+    processes = [flexmock(), flexmock()]
     flexmock(module).should_receive('make_dump_path').and_return('')
     flexmock(module.dump).should_receive('make_database_dump_filename').and_return(
         'databases/localhost/foo'
     ).and_return('databases/localhost/bar')
-    flexmock(module.os).should_receive('makedirs')
+    flexmock(module.dump).should_receive('create_named_pipe_for_dump')
 
-    for name in ('foo', 'bar'):
+    for name, process in zip(('foo', 'bar'), processes):
         flexmock(module).should_receive('execute_command').with_args(
             (
                 'pg_dump',
                 '--no-password',
                 '--clean',
                 '--if-exists',
+                '--no-sync',
                 '--file',
                 'databases/localhost/{}'.format(name),
                 '--format',
@@ -25,9 +28,10 @@ def test_dump_databases_runs_pg_dump_for_each_database():
                 name,
             ),
             extra_environment=None,
-        ).once()
+            run_to_completion=False,
+        ).and_return(process).once()
 
-    module.dump_databases(databases, 'test.yaml', {}, dry_run=False)
+    assert module.dump_databases(databases, 'test.yaml', {}, dry_run=False) == processes
 
 
 def test_dump_databases_with_dry_run_skips_pg_dump():
@@ -36,19 +40,20 @@ def test_dump_databases_with_dry_run_skips_pg_dump():
     flexmock(module.dump).should_receive('make_database_dump_filename').and_return(
         'databases/localhost/foo'
     ).and_return('databases/localhost/bar')
-    flexmock(module.os).should_receive('makedirs').never()
+    flexmock(module.dump).should_receive('create_named_pipe_for_dump').never()
     flexmock(module).should_receive('execute_command').never()
 
-    module.dump_databases(databases, 'test.yaml', {}, dry_run=True)
+    assert module.dump_databases(databases, 'test.yaml', {}, dry_run=True) == []
 
 
 def test_dump_databases_runs_pg_dump_with_hostname_and_port():
     databases = [{'name': 'foo', 'hostname': 'database.example.org', 'port': 5433}]
+    process = flexmock()
     flexmock(module).should_receive('make_dump_path').and_return('')
     flexmock(module.dump).should_receive('make_database_dump_filename').and_return(
         'databases/database.example.org/foo'
     )
-    flexmock(module.os).should_receive('makedirs')
+    flexmock(module.dump).should_receive('create_named_pipe_for_dump')
 
     flexmock(module).should_receive('execute_command').with_args(
         (
@@ -56,6 +61,7 @@ def test_dump_databases_runs_pg_dump_with_hostname_and_port():
             '--no-password',
             '--clean',
             '--if-exists',
+            '--no-sync',
             '--file',
             'databases/database.example.org/foo',
             '--host',
@@ -67,18 +73,20 @@ def test_dump_databases_runs_pg_dump_with_hostname_and_port():
             'foo',
         ),
         extra_environment=None,
-    ).once()
+        run_to_completion=False,
+    ).and_return(process).once()
 
-    module.dump_databases(databases, 'test.yaml', {}, dry_run=False)
+    assert module.dump_databases(databases, 'test.yaml', {}, dry_run=False) == [process]
 
 
 def test_dump_databases_runs_pg_dump_with_username_and_password():
     databases = [{'name': 'foo', 'username': 'postgres', 'password': 'trustsome1'}]
+    process = flexmock()
     flexmock(module).should_receive('make_dump_path').and_return('')
     flexmock(module.dump).should_receive('make_database_dump_filename').and_return(
         'databases/localhost/foo'
     )
-    flexmock(module.os).should_receive('makedirs')
+    flexmock(module.dump).should_receive('create_named_pipe_for_dump')
 
     flexmock(module).should_receive('execute_command').with_args(
         (
@@ -86,6 +94,7 @@ def test_dump_databases_runs_pg_dump_with_username_and_password():
             '--no-password',
             '--clean',
             '--if-exists',
+            '--no-sync',
             '--file',
             'databases/localhost/foo',
             '--username',
@@ -95,18 +104,20 @@ def test_dump_databases_runs_pg_dump_with_username_and_password():
             'foo',
         ),
         extra_environment={'PGPASSWORD': 'trustsome1'},
-    ).once()
+        run_to_completion=False,
+    ).and_return(process).once()
 
-    module.dump_databases(databases, 'test.yaml', {}, dry_run=False)
+    assert module.dump_databases(databases, 'test.yaml', {}, dry_run=False) == [process]
 
 
 def test_dump_databases_runs_pg_dump_with_format():
     databases = [{'name': 'foo', 'format': 'tar'}]
+    process = flexmock()
     flexmock(module).should_receive('make_dump_path').and_return('')
     flexmock(module.dump).should_receive('make_database_dump_filename').and_return(
         'databases/localhost/foo'
     )
-    flexmock(module.os).should_receive('makedirs')
+    flexmock(module.dump).should_receive('create_named_pipe_for_dump')
 
     flexmock(module).should_receive('execute_command').with_args(
         (
@@ -114,6 +125,7 @@ def test_dump_databases_runs_pg_dump_with_format():
             '--no-password',
             '--clean',
             '--if-exists',
+            '--no-sync',
             '--file',
             'databases/localhost/foo',
             '--format',
@@ -121,18 +133,20 @@ def test_dump_databases_runs_pg_dump_with_format():
             'foo',
         ),
         extra_environment=None,
-    ).once()
+        run_to_completion=False,
+    ).and_return(process).once()
 
-    module.dump_databases(databases, 'test.yaml', {}, dry_run=False)
+    assert module.dump_databases(databases, 'test.yaml', {}, dry_run=False) == [process]
 
 
 def test_dump_databases_runs_pg_dump_with_options():
     databases = [{'name': 'foo', 'options': '--stuff=such'}]
+    process = flexmock()
     flexmock(module).should_receive('make_dump_path').and_return('')
     flexmock(module.dump).should_receive('make_database_dump_filename').and_return(
         'databases/localhost/foo'
     )
-    flexmock(module.os).should_receive('makedirs')
+    flexmock(module.dump).should_receive('create_named_pipe_for_dump')
 
     flexmock(module).should_receive('execute_command').with_args(
         (
@@ -140,6 +154,7 @@ def test_dump_databases_runs_pg_dump_with_options():
             '--no-password',
             '--clean',
             '--if-exists',
+            '--no-sync',
             '--file',
             'databases/localhost/foo',
             '--format',
@@ -148,18 +163,20 @@ def test_dump_databases_runs_pg_dump_with_options():
             'foo',
         ),
         extra_environment=None,
-    ).once()
+        run_to_completion=False,
+    ).and_return(process).once()
 
-    module.dump_databases(databases, 'test.yaml', {}, dry_run=False)
+    assert module.dump_databases(databases, 'test.yaml', {}, dry_run=False) == [process]
 
 
 def test_dump_databases_runs_pg_dumpall_for_all_databases():
     databases = [{'name': 'all'}]
+    process = flexmock()
     flexmock(module).should_receive('make_dump_path').and_return('')
     flexmock(module.dump).should_receive('make_database_dump_filename').and_return(
         'databases/localhost/all'
     )
-    flexmock(module.os).should_receive('makedirs')
+    flexmock(module.dump).should_receive('create_named_pipe_for_dump')
 
     flexmock(module).should_receive('execute_command').with_args(
         (
@@ -167,73 +184,62 @@ def test_dump_databases_runs_pg_dumpall_for_all_databases():
             '--no-password',
             '--clean',
             '--if-exists',
+            '--no-sync',
             '--file',
             'databases/localhost/all',
         ),
         extra_environment=None,
-    ).once()
+        run_to_completion=False,
+    ).and_return(process).once()
 
-    module.dump_databases(databases, 'test.yaml', {}, dry_run=False)
-
-
-def test_make_database_dump_patterns_converts_names_to_glob_paths():
-    flexmock(module).should_receive('make_dump_path').and_return('')
-    flexmock(module.dump).should_receive('make_database_dump_filename').and_return(
-        'databases/*/foo'
-    ).and_return('databases/*/bar')
+    assert module.dump_databases(databases, 'test.yaml', {}, dry_run=False) == [process]
 
-    assert module.make_database_dump_patterns(flexmock(), flexmock(), {}, ('foo', 'bar')) == [
-        'databases/*/foo',
-        'databases/*/bar',
-    ]
 
+def test_restore_database_dump_runs_pg_restore():
+    database_config = [{'name': 'foo'}]
+    extract_process = flexmock(stdout=flexmock())
 
-def test_make_database_dump_patterns_treats_empty_names_as_matching_all_databases():
-    flexmock(module).should_receive('make_dump_path').and_return('/dump/path')
-    flexmock(module.dump).should_receive('make_database_dump_filename').with_args(
-        '/dump/path', '*', '*'
-    ).and_return('databases/*/*')
+    flexmock(module).should_receive('execute_command_with_processes').with_args(
+        (
+            'pg_restore',
+            '--no-password',
+            '--if-exists',
+            '--exit-on-error',
+            '--clean',
+            '--dbname',
+            'foo',
+        ),
+        processes=[extract_process],
+        input_file=extract_process.stdout,
+        extra_environment=None,
+    ).once()
+    flexmock(module).should_receive('execute_command').with_args(
+        ('psql', '--no-password', '--quiet', '--dbname', 'foo', '--command', 'ANALYZE'),
+        extra_environment=None,
+    ).once()
 
-    assert module.make_database_dump_patterns(flexmock(), flexmock(), {}, ()) == ['databases/*/*']
+    module.restore_database_dump(
+        database_config, 'test.yaml', {}, dry_run=False, extract_process=extract_process
+    )
 
 
-def test_restore_database_dumps_restores_each_database():
-    databases = [{'name': 'foo'}, {'name': 'bar'}]
-    flexmock(module).should_receive('make_dump_path').and_return('')
-    flexmock(module.dump).should_receive('make_database_dump_filename').and_return(
-        'databases/localhost/foo'
-    ).and_return('databases/localhost/bar')
+def test_restore_database_dump_errors_on_multiple_database_config():
+    database_config = [{'name': 'foo'}, {'name': 'bar'}]
 
-    for name in ('foo', 'bar'):
-        flexmock(module).should_receive('execute_command').with_args(
-            (
-                'pg_restore',
-                '--no-password',
-                '--if-exists',
-                '--exit-on-error',
-                '--clean',
-                '--dbname',
-                name,
-                'databases/localhost/{}'.format(name),
-            ),
-            extra_environment=None,
-        ).once()
-        flexmock(module).should_receive('execute_command').with_args(
-            ('psql', '--no-password', '--quiet', '--dbname', name, '--command', 'ANALYZE'),
-            extra_environment=None,
-        ).once()
+    flexmock(module).should_receive('execute_command_with_processes').never()
+    flexmock(module).should_receive('execute_command').never()
 
-    module.restore_database_dumps(databases, 'test.yaml', {}, dry_run=False)
+    with pytest.raises(ValueError):
+        module.restore_database_dump(
+            database_config, 'test.yaml', {}, dry_run=False, extract_process=flexmock()
+        )
 
 
-def test_restore_database_dumps_runs_pg_restore_with_hostname_and_port():
-    databases = [{'name': 'foo', 'hostname': 'database.example.org', 'port': 5433}]
-    flexmock(module).should_receive('make_dump_path').and_return('')
-    flexmock(module.dump).should_receive('make_database_dump_filename').and_return(
-        'databases/localhost/foo'
-    )
+def test_restore_database_dump_runs_pg_restore_with_hostname_and_port():
+    database_config = [{'name': 'foo', 'hostname': 'database.example.org', 'port': 5433}]
+    extract_process = flexmock(stdout=flexmock())
 
-    flexmock(module).should_receive('execute_command').with_args(
+    flexmock(module).should_receive('execute_command_with_processes').with_args(
         (
             'pg_restore',
             '--no-password',
@@ -246,8 +252,9 @@ def test_restore_database_dumps_runs_pg_restore_with_hostname_and_port():
             'database.example.org',
             '--port',
             '5433',
-            'databases/localhost/foo',
         ),
+        processes=[extract_process],
+        input_file=extract_process.stdout,
         extra_environment=None,
     ).once()
     flexmock(module).should_receive('execute_command').with_args(
@@ -267,17 +274,16 @@ def test_restore_database_dumps_runs_pg_restore_with_hostname_and_port():
         extra_environment=None,
     ).once()
 
-    module.restore_database_dumps(databases, 'test.yaml', {}, dry_run=False)
+    module.restore_database_dump(
+        database_config, 'test.yaml', {}, dry_run=False, extract_process=extract_process
+    )
 
 
-def test_restore_database_dumps_runs_pg_restore_with_username_and_password():
-    databases = [{'name': 'foo', 'username': 'postgres', 'password': 'trustsome1'}]
-    flexmock(module).should_receive('make_dump_path').and_return('')
-    flexmock(module.dump).should_receive('make_database_dump_filename').and_return(
-        'databases/localhost/foo'
-    )
+def test_restore_database_dump_runs_pg_restore_with_username_and_password():
+    database_config = [{'name': 'foo', 'username': 'postgres', 'password': 'trustsome1'}]
+    extract_process = flexmock(stdout=flexmock())
 
-    flexmock(module).should_receive('execute_command').with_args(
+    flexmock(module).should_receive('execute_command_with_processes').with_args(
         (
             'pg_restore',
             '--no-password',
@@ -288,8 +294,9 @@ def test_restore_database_dumps_runs_pg_restore_with_username_and_password():
             'foo',
             '--username',
             'postgres',
-            'databases/localhost/foo',
         ),
+        processes=[extract_process],
+        input_file=extract_process.stdout,
         extra_environment={'PGPASSWORD': 'trustsome1'},
     ).once()
     flexmock(module).should_receive('execute_command').with_args(
@@ -307,21 +314,25 @@ def test_restore_database_dumps_runs_pg_restore_with_username_and_password():
         extra_environment={'PGPASSWORD': 'trustsome1'},
     ).once()
 
-    module.restore_database_dumps(databases, 'test.yaml', {}, dry_run=False)
+    module.restore_database_dump(
+        database_config, 'test.yaml', {}, dry_run=False, extract_process=extract_process
+    )
 
 
-def test_restore_database_dumps_runs_psql_for_all_database_dump():
-    databases = [{'name': 'all'}]
-    flexmock(module).should_receive('make_dump_path').and_return('')
-    flexmock(module.dump).should_receive('make_database_dump_filename').and_return(
-        'databases/localhost/all'
-    )
+def test_restore_database_dump_runs_psql_for_all_database_dump():
+    database_config = [{'name': 'all'}]
+    extract_process = flexmock(stdout=flexmock())
 
-    flexmock(module).should_receive('execute_command').with_args(
-        ('psql', '--no-password', '-f', 'databases/localhost/all'), extra_environment=None
+    flexmock(module).should_receive('execute_command_with_processes').with_args(
+        ('psql', '--no-password'),
+        processes=[extract_process],
+        input_file=extract_process.stdout,
+        extra_environment=None,
     ).once()
     flexmock(module).should_receive('execute_command').with_args(
         ('psql', '--no-password', '--quiet', '--command', 'ANALYZE'), extra_environment=None
     ).once()
 
-    module.restore_database_dumps(databases, 'test.yaml', {}, dry_run=False)
+    module.restore_database_dump(
+        database_config, 'test.yaml', {}, dry_run=False, extract_process=extract_process
+    )

+ 19 - 1
tests/unit/test_execute.py

@@ -87,7 +87,7 @@ def test_execute_command_calls_full_command_with_shell():
     full_command = ['foo', 'bar']
     flexmock(module.os, environ={'a': 'b'})
     flexmock(module.subprocess).should_receive('Popen').with_args(
-        full_command,
+        ' '.join(full_command),
         stdin=None,
         stdout=module.subprocess.PIPE,
         stderr=module.subprocess.STDOUT,
@@ -140,6 +140,24 @@ def test_execute_command_calls_full_command_with_working_directory():
     assert output is None
 
 
+def test_execute_command_without_run_to_completion_returns_process():
+    full_command = ['foo', 'bar']
+    process = flexmock()
+    flexmock(module.os, environ={'a': 'b'})
+    flexmock(module.subprocess).should_receive('Popen').with_args(
+        full_command,
+        stdin=None,
+        stdout=module.subprocess.PIPE,
+        stderr=module.subprocess.STDOUT,
+        shell=False,
+        env=None,
+        cwd=None,
+    ).and_return(process).once()
+    flexmock(module).should_receive('log_output')
+
+    assert module.execute_command(full_command, run_to_completion=False) == process
+
+
 def test_execute_command_captures_output():
     full_command = ['foo', 'bar']
     expected_output = '[]'