Browse Source

ZFS snapshots WIP (#261).

Dan Helfman 9 months ago
parent
commit
ab43ef00ce

+ 2 - 0
borgmatic/actions/check.py

@@ -8,6 +8,7 @@ import pathlib
 import random
 import random
 import shutil
 import shutil
 
 
+import borgmatic.actions.create
 import borgmatic.borg.check
 import borgmatic.borg.check
 import borgmatic.borg.create
 import borgmatic.borg.create
 import borgmatic.borg.environment
 import borgmatic.borg.environment
@@ -367,6 +368,7 @@ def collect_spot_check_source_paths(
             repository_path=repository['path'],
             repository_path=repository['path'],
             config=config,
             config=config,
             config_paths=(),
             config_paths=(),
+            source_directories=borgmatic.actions.create.process_source_directories(config),
             local_borg_version=local_borg_version,
             local_borg_version=local_borg_version,
             global_arguments=global_arguments,
             global_arguments=global_arguments,
             borgmatic_runtime_directories=(),
             borgmatic_runtime_directories=(),

+ 133 - 0
borgmatic/actions/create.py

@@ -1,7 +1,10 @@
+import glob
 import importlib.metadata
 import importlib.metadata
+import itertools
 import json
 import json
 import logging
 import logging
 import os
 import os
+import pathlib
 
 
 import borgmatic.actions.json
 import borgmatic.actions.json
 import borgmatic.borg.create
 import borgmatic.borg.create
@@ -40,6 +43,131 @@ def create_borgmatic_manifest(config, config_paths, borgmatic_runtime_directory,
         )
         )
 
 
 
 
+def expand_directory(directory, working_directory):
+    '''
+    Given a directory path, expand any tilde (representing a user's home directory) and any globs
+    therein. Return a list of one or more resulting paths.
+    '''
+    expanded_directory = os.path.join(working_directory or '', os.path.expanduser(directory))
+
+    return glob.glob(expanded_directory) or [expanded_directory]
+
+
+def expand_directories(directories, working_directory=None):
+    '''
+    Given a sequence of directory paths and an optional working directory, expand tildes and globs
+    in each one. Return all the resulting directories as a single flattened tuple.
+    '''
+    if directories is None:
+        return ()
+
+    return tuple(
+        itertools.chain.from_iterable(
+            expand_directory(directory, working_directory) for directory in directories
+        )
+    )
+
+
+def map_directories_to_devices(directories, working_directory=None):
+    '''
+    Given a sequence of directories and an optional working directory, return a map from directory
+    to an identifier for the device on which that directory resides or None if the path doesn't
+    exist.
+
+    This is handy for determining whether two different directories are on the same filesystem (have
+    the same device identifier).
+    '''
+    return {
+        directory: os.stat(full_directory).st_dev if os.path.exists(full_directory) else None
+        for directory in directories
+        for full_directory in (os.path.join(working_directory or '', directory),)
+    }
+
+
+def deduplicate_directories(directory_devices, additional_directory_devices):
+    '''
+    Given a map from directory to the identifier for the device on which that directory resides,
+    return the directories as a sorted sequence with all duplicate child directories removed. For
+    instance, if paths is ['/foo', '/foo/bar'], return just: ['/foo']
+
+    The one exception to this rule is if two paths are on different filesystems (devices). In that
+    case, they won't get de-duplicated in case they both need to be passed to Borg (e.g. the
+    location.one_file_system option is true).
+
+    The idea is that if Borg is given a parent directory, then it doesn't also need to be given
+    child directories, because it will naturally spider the contents of the parent directory. And
+    there are cases where Borg coming across the same file twice will result in duplicate reads and
+    even hangs, e.g. when a database hook is using a named pipe for streaming database dumps to
+    Borg.
+
+    If any additional directory devices are given, also deduplicate against them, but don't include
+    them in the returned directories.
+    '''
+    deduplicated = set()
+    directories = sorted(directory_devices.keys())
+    additional_directories = sorted(additional_directory_devices.keys())
+    all_devices = {**directory_devices, **additional_directory_devices}
+
+    for directory in directories:
+        deduplicated.add(directory)
+        parents = pathlib.PurePath(directory).parents
+
+        # If another directory in the given list (or the additional list) is a parent of current
+        # directory (even n levels up) and both are on the same filesystem, then the current
+        # directory is a duplicate.
+        for other_directory in directories + additional_directories:
+            for parent in parents:
+                if (
+                    pathlib.PurePath(other_directory) == parent
+                    and all_devices[directory] is not None
+                    and all_devices[other_directory] == all_devices[directory]
+                ):
+                    if directory in deduplicated:
+                        deduplicated.remove(directory)
+                    break
+
+    return sorted(deduplicated)
+
+
+def pattern_root_directories(patterns=None):
+    '''
+    Given a sequence of patterns, parse out and return just the root directories.
+    '''
+    if not patterns:
+        return []
+
+    return [
+        pattern.split(ROOT_PATTERN_PREFIX, maxsplit=1)[1]
+        for pattern in patterns
+        if pattern.startswith(ROOT_PATTERN_PREFIX)
+    ]
+
+
+def process_source_directories(config, config_paths, borgmatic_runtime_directory):
+    '''
+    Given a configuration dict, a sequence of configuration paths, and the borgmatic runtime
+    directory, expand and deduplicate the source directories from them.
+    '''
+    working_directory = borgmatic.config.paths.get_working_directory(config)
+
+    return deduplicate_directories(
+        map_directories_to_devices(
+            expand_directories(
+                tuple(config.get('source_directories', ()))
+                + (borgmatic_runtime_directory,)
+                + tuple(config_paths if config.get('store_config_files', True) else ()),
+                working_directory=working_directory,
+            )
+        ),
+        additional_directory_devices=map_directories_to_devices(
+            expand_directories(
+                pattern_root_directories(config.get('patterns')),
+                working_directory=working_directory,
+            )
+        ),
+    )
+
+
 def run_create(
 def run_create(
     config_filename,
     config_filename,
     repository,
     repository,
@@ -86,12 +214,16 @@ def run_create(
             borgmatic_runtime_directory,
             borgmatic_runtime_directory,
             global_arguments.dry_run,
             global_arguments.dry_run,
         )
         )
+        source_directories = process_source_directories(
+            config, config_paths, borgmatic_runtime_directory
+        )
         active_dumps = borgmatic.hooks.dispatch.call_hooks(
         active_dumps = borgmatic.hooks.dispatch.call_hooks(
             'dump_data_sources',
             'dump_data_sources',
             config,
             config,
             repository['path'],
             repository['path'],
             borgmatic.hooks.dump.DATA_SOURCE_HOOK_NAMES,
             borgmatic.hooks.dump.DATA_SOURCE_HOOK_NAMES,
             borgmatic_runtime_directory,
             borgmatic_runtime_directory,
+            source_directories,
             global_arguments.dry_run,
             global_arguments.dry_run,
         )
         )
         stream_processes = [process for processes in active_dumps.values() for process in processes]
         stream_processes = [process for processes in active_dumps.values() for process in processes]
@@ -109,6 +241,7 @@ def run_create(
             repository['path'],
             repository['path'],
             config,
             config,
             config_paths,
             config_paths,
+            source_directories,
             local_borg_version,
             local_borg_version,
             global_arguments,
             global_arguments,
             borgmatic_runtime_directory,
             borgmatic_runtime_directory,

+ 10 - 136
borgmatic/borg/create.py

@@ -1,4 +1,3 @@
-import glob
 import itertools
 import itertools
 import logging
 import logging
 import os
 import os
@@ -20,31 +19,6 @@ from borgmatic.execute import (
 logger = logging.getLogger(__name__)
 logger = logging.getLogger(__name__)
 
 
 
 
-def expand_directory(directory, working_directory):
-    '''
-    Given a directory path, expand any tilde (representing a user's home directory) and any globs
-    therein. Return a list of one or more resulting paths.
-    '''
-    expanded_directory = os.path.join(working_directory or '', os.path.expanduser(directory))
-
-    return glob.glob(expanded_directory) or [expanded_directory]
-
-
-def expand_directories(directories, working_directory=None):
-    '''
-    Given a sequence of directory paths and an optional working directory, expand tildes and globs
-    in each one. Return all the resulting directories as a single flattened tuple.
-    '''
-    if directories is None:
-        return ()
-
-    return tuple(
-        itertools.chain.from_iterable(
-            expand_directory(directory, working_directory) for directory in directories
-        )
-    )
-
-
 def expand_home_directories(directories):
 def expand_home_directories(directories):
     '''
     '''
     Given a sequence of directory paths, expand tildes in each one. Do not perform any globbing.
     Given a sequence of directory paths, expand tildes in each one. Do not perform any globbing.
@@ -56,67 +30,6 @@ def expand_home_directories(directories):
     return tuple(os.path.expanduser(directory) for directory in directories)
     return tuple(os.path.expanduser(directory) for directory in directories)
 
 
 
 
-def map_directories_to_devices(directories, working_directory=None):
-    '''
-    Given a sequence of directories and an optional working directory, return a map from directory
-    to an identifier for the device on which that directory resides or None if the path doesn't
-    exist.
-
-    This is handy for determining whether two different directories are on the same filesystem (have
-    the same device identifier).
-    '''
-    return {
-        directory: os.stat(full_directory).st_dev if os.path.exists(full_directory) else None
-        for directory in directories
-        for full_directory in (os.path.join(working_directory or '', directory),)
-    }
-
-
-def deduplicate_directories(directory_devices, additional_directory_devices):
-    '''
-    Given a map from directory to the identifier for the device on which that directory resides,
-    return the directories as a sorted tuple with all duplicate child directories removed. For
-    instance, if paths is ('/foo', '/foo/bar'), return just: ('/foo',)
-
-    The one exception to this rule is if two paths are on different filesystems (devices). In that
-    case, they won't get de-duplicated in case they both need to be passed to Borg (e.g. the
-    location.one_file_system option is true).
-
-    The idea is that if Borg is given a parent directory, then it doesn't also need to be given
-    child directories, because it will naturally spider the contents of the parent directory. And
-    there are cases where Borg coming across the same file twice will result in duplicate reads and
-    even hangs, e.g. when a database hook is using a named pipe for streaming database dumps to
-    Borg.
-
-    If any additional directory devices are given, also deduplicate against them, but don't include
-    them in the returned directories.
-    '''
-    deduplicated = set()
-    directories = sorted(directory_devices.keys())
-    additional_directories = sorted(additional_directory_devices.keys())
-    all_devices = {**directory_devices, **additional_directory_devices}
-
-    for directory in directories:
-        deduplicated.add(directory)
-        parents = pathlib.PurePath(directory).parents
-
-        # If another directory in the given list (or the additional list) is a parent of current
-        # directory (even n levels up) and both are on the same filesystem, then the current
-        # directory is a duplicate.
-        for other_directory in directories + additional_directories:
-            for parent in parents:
-                if (
-                    pathlib.PurePath(other_directory) == parent
-                    and all_devices[directory] is not None
-                    and all_devices[other_directory] == all_devices[directory]
-                ):
-                    if directory in deduplicated:
-                        deduplicated.remove(directory)
-                    break
-
-    return tuple(sorted(deduplicated))
-
-
 def write_pattern_file(patterns=None, sources=None, pattern_file=None):
 def write_pattern_file(patterns=None, sources=None, pattern_file=None):
     '''
     '''
     Given a sequence of patterns and an optional sequence of source directories, write them to a
     Given a sequence of patterns and an optional sequence of source directories, write them to a
@@ -221,32 +134,9 @@ def make_list_filter_flags(local_borg_version, dry_run):
         return f'{base_flags}-'
         return f'{base_flags}-'
 
 
 
 
-def collect_borgmatic_runtime_directories(borgmatic_runtime_directory):
-    '''
-    Return a list of borgmatic-specific runtime directories used for temporary runtime data like
-    streaming database dumps and bootstrap metadata. If no such directories exist, return an empty
-    list.
-    '''
-    return [borgmatic_runtime_directory] if os.path.exists(borgmatic_runtime_directory) else []
-
-
 ROOT_PATTERN_PREFIX = 'R '
 ROOT_PATTERN_PREFIX = 'R '
 
 
 
 
-def pattern_root_directories(patterns=None):
-    '''
-    Given a sequence of patterns, parse out and return just the root directories.
-    '''
-    if not patterns:
-        return []
-
-    return [
-        pattern.split(ROOT_PATTERN_PREFIX, maxsplit=1)[1]
-        for pattern in patterns
-        if pattern.startswith(ROOT_PATTERN_PREFIX)
-    ]
-
-
 def special_file(path):
 def special_file(path):
     '''
     '''
     Return whether the given path is a special file (character device, block device, or named pipe
     Return whether the given path is a special file (character device, block device, or named pipe
@@ -335,9 +225,10 @@ def make_base_create_command(
     repository_path,
     repository_path,
     config,
     config,
     config_paths,
     config_paths,
+    source_directories,
     local_borg_version,
     local_borg_version,
     global_arguments,
     global_arguments,
-    borgmatic_runtime_directories,
+    borgmatic_runtime_directory,
     local_path='borg',
     local_path='borg',
     remote_path=None,
     remote_path=None,
     progress=False,
     progress=False,
@@ -359,27 +250,10 @@ def make_base_create_command(
             config.get('source_directories'), working_directory=working_directory
             config.get('source_directories'), working_directory=working_directory
         )
         )
 
 
-    sources = deduplicate_directories(
-        map_directories_to_devices(
-            expand_directories(
-                tuple(config.get('source_directories', ()))
-                + borgmatic_runtime_directories
-                + tuple(config_paths if config.get('store_config_files', True) else ()),
-                working_directory=working_directory,
-            )
-        ),
-        additional_directory_devices=map_directories_to_devices(
-            expand_directories(
-                pattern_root_directories(config.get('patterns')),
-                working_directory=working_directory,
-            )
-        ),
-    )
-
     ensure_files_readable(config.get('patterns_from'), config.get('exclude_from'))
     ensure_files_readable(config.get('patterns_from'), config.get('exclude_from'))
 
 
     pattern_file = (
     pattern_file = (
-        write_pattern_file(config.get('patterns'), sources)
+        write_pattern_file(config.get('patterns'), source_directories)
         if config.get('patterns') or config.get('patterns_from')
         if config.get('patterns') or config.get('patterns_from')
         else None
         else None
     )
     )
@@ -457,7 +331,7 @@ def make_base_create_command(
 
 
     create_positional_arguments = flags.make_repository_archive_flags(
     create_positional_arguments = flags.make_repository_archive_flags(
         repository_path, archive_name_format, local_borg_version
         repository_path, archive_name_format, local_borg_version
-    ) + (sources if not pattern_file else ())
+    ) + (tuple(source_directories) if not pattern_file else ())
 
 
     # If database hooks are enabled (as indicated by streaming processes), exclude files that might
     # If database hooks are enabled (as indicated by streaming processes), exclude files that might
     # cause Borg to hang. But skip this if the user has explicitly set the "read_special" to True.
     # cause Borg to hang. But skip this if the user has explicitly set the "read_special" to True.
@@ -474,7 +348,9 @@ def make_base_create_command(
             local_path,
             local_path,
             working_directory,
             working_directory,
             borg_environment,
             borg_environment,
-            skip_directories=borgmatic_runtime_directories,
+            skip_directories=(
+                [borgmatic_runtime_directory] if os.path.exists(borgmatic_runtime_directory) else []
+            ),
         )
         )
 
 
         if special_file_paths:
         if special_file_paths:
@@ -502,6 +378,7 @@ def create_archive(
     repository_path,
     repository_path,
     config,
     config,
     config_paths,
     config_paths,
+    source_directories,
     local_borg_version,
     local_borg_version,
     global_arguments,
     global_arguments,
     borgmatic_runtime_directory,
     borgmatic_runtime_directory,
@@ -524,10 +401,6 @@ def create_archive(
     borgmatic.logger.add_custom_log_levels()
     borgmatic.logger.add_custom_log_levels()
 
 
     working_directory = borgmatic.config.paths.get_working_directory(config)
     working_directory = borgmatic.config.paths.get_working_directory(config)
-    borgmatic_runtime_directories = expand_directories(
-        collect_borgmatic_runtime_directories(borgmatic_runtime_directory),
-        working_directory=working_directory,
-    )
 
 
     (create_flags, create_positional_arguments, pattern_file, exclude_file) = (
     (create_flags, create_positional_arguments, pattern_file, exclude_file) = (
         make_base_create_command(
         make_base_create_command(
@@ -535,9 +408,10 @@ def create_archive(
             repository_path,
             repository_path,
             config,
             config,
             config_paths,
             config_paths,
+            source_directories,
             local_borg_version,
             local_borg_version,
             global_arguments,
             global_arguments,
-            borgmatic_runtime_directories,
+            borgmatic_runtime_directory,
             local_path,
             local_path,
             remote_path,
             remote_path,
             progress,
             progress,

+ 30 - 0
borgmatic/config/schema.yaml

@@ -2259,3 +2259,33 @@ properties:
             can send the logs to a self-hosted instance or create an account at
             can send the logs to a self-hosted instance or create an account at
             https://grafana.com/auth/sign-up/create-user. See borgmatic
             https://grafana.com/auth/sign-up/create-user. See borgmatic
             monitoring documentation for details.
             monitoring documentation for details.
+
+    zfs:
+        type: object
+        required: ['enabled']
+        additionalProperties: false
+        properties:
+            enabled:
+                type: boolean
+                description: |
+                    Whether to auto-detect and snapshot any ZFS dataset mount
+                    points listed in "source_directories" when creating backups.
+                    Defaults to false.
+                example: true
+            zfs_command:
+                type: string
+                description: |
+                    Command to use instead of "zfs".
+                example: /usr/local/bin/zfs
+            mount_command:
+                type: string
+                description: |
+                    Command to use instead of "mount".
+                example: /usr/local/bin/mount
+            umount_command:
+                type: string
+                description: |
+                    Command to use instead of "umount".
+                example: /usr/local/bin/umount
+        description: |
+            Configuration for a integration with the ZFS filesystem.

+ 2 - 0
borgmatic/hooks/dispatch.py

@@ -16,6 +16,7 @@ from borgmatic.hooks import (
     sqlite,
     sqlite,
     uptimekuma,
     uptimekuma,
     zabbix,
     zabbix,
+    zfs,
 )
 )
 
 
 logger = logging.getLogger(__name__)
 logger = logging.getLogger(__name__)
@@ -36,6 +37,7 @@ HOOK_NAME_TO_MODULE = {
     'sqlite_databases': sqlite,
     'sqlite_databases': sqlite,
     'uptime_kuma': uptimekuma,
     'uptime_kuma': uptimekuma,
     'zabbix': zabbix,
     'zabbix': zabbix,
+    'zfs': zfs,
 }
 }
 
 
 
 

+ 1 - 0
borgmatic/hooks/dump.py

@@ -11,6 +11,7 @@ DATA_SOURCE_HOOK_NAMES = (
     'mongodb_databases',
     'mongodb_databases',
     'postgresql_databases',
     'postgresql_databases',
     'sqlite_databases',
     'sqlite_databases',
+    'zfs',
 )
 )
 
 
 
 

+ 3 - 1
borgmatic/hooks/mariadb.py

@@ -122,7 +122,9 @@ def use_streaming(databases, config, log_prefix):
     return any(databases)
     return any(databases)
 
 
 
 
-def dump_data_sources(databases, config, log_prefix, borgmatic_runtime_directory, dry_run):
+def dump_data_sources(
+    databases, config, log_prefix, borgmatic_runtime_directory, source_directories, dry_run
+):
     '''
     '''
     Dump the given MariaDB databases to a named pipe. The databases are supplied as a sequence of
     Dump the given MariaDB databases to a named pipe. The databases are supplied as a sequence of
     dicts, one dict describing each database as per the configuration schema. Use the given
     dicts, one dict describing each database as per the configuration schema. Use the given

+ 3 - 1
borgmatic/hooks/mongodb.py

@@ -23,7 +23,9 @@ def use_streaming(databases, config, log_prefix):
     return any(database.get('format') != 'directory' for database in databases)
     return any(database.get('format') != 'directory' for database in databases)
 
 
 
 
-def dump_data_sources(databases, config, log_prefix, borgmatic_runtime_directory, dry_run):
+def dump_data_sources(
+    databases, config, log_prefix, borgmatic_runtime_directory, source_directories, dry_run
+):
     '''
     '''
     Dump the given MongoDB databases to a named pipe. The databases are supplied as a sequence of
     Dump the given MongoDB databases to a named pipe. The databases are supplied as a sequence of
     dicts, one dict describing each database as per the configuration schema. Use the borgmatic
     dicts, one dict describing each database as per the configuration schema. Use the borgmatic

+ 3 - 1
borgmatic/hooks/mysql.py

@@ -121,7 +121,9 @@ def use_streaming(databases, config, log_prefix):
     return any(databases)
     return any(databases)
 
 
 
 
-def dump_data_sources(databases, config, log_prefix, borgmatic_runtime_directory, dry_run):
+def dump_data_sources(
+    databases, config, log_prefix, borgmatic_runtime_directory, source_directories, dry_run
+):
     '''
     '''
     Dump the given MySQL/MariaDB databases to a named pipe. The databases are supplied as a sequence
     Dump the given MySQL/MariaDB databases to a named pipe. The databases are supplied as a sequence
     of dicts, one dict describing each database as per the configuration schema. Use the given
     of dicts, one dict describing each database as per the configuration schema. Use the given

+ 3 - 1
borgmatic/hooks/postgresql.py

@@ -104,7 +104,9 @@ def use_streaming(databases, config, log_prefix):
     return any(database.get('format') != 'directory' for database in databases)
     return any(database.get('format') != 'directory' for database in databases)
 
 
 
 
-def dump_data_sources(databases, config, log_prefix, borgmatic_runtime_directory, dry_run):
+def dump_data_sources(
+    databases, config, log_prefix, borgmatic_runtime_directory, source_directories, dry_run
+):
     '''
     '''
     Dump the given PostgreSQL databases to a named pipe. The databases are supplied as a sequence of
     Dump the given PostgreSQL databases to a named pipe. The databases are supplied as a sequence of
     dicts, one dict describing each database as per the configuration schema. Use the given
     dicts, one dict describing each database as per the configuration schema. Use the given

+ 3 - 1
borgmatic/hooks/sqlite.py

@@ -24,7 +24,9 @@ def use_streaming(databases, config, log_prefix):
     return any(databases)
     return any(databases)
 
 
 
 
-def dump_data_sources(databases, config, log_prefix, borgmatic_runtime_directory, dry_run):
+def dump_data_sources(
+    databases, config, log_prefix, borgmatic_runtime_directory, source_directories, dry_run
+):
     '''
     '''
     Dump the given SQLite databases to a named pipe. The databases are supplied as a sequence of
     Dump the given SQLite databases to a named pipe. The databases are supplied as a sequence of
     configuration dicts, as per the configuration schema. Use the given borgmatic runtime directory
     configuration dicts, as per the configuration schema. Use the given borgmatic runtime directory

+ 224 - 0
borgmatic/hooks/zfs.py

@@ -0,0 +1,224 @@
+import logging
+import os
+import shlex
+
+import borgmatic.config.paths
+import borgmatic.execute
+
+logger = logging.getLogger(__name__)
+
+
+def use_streaming(hook_config, config, log_prefix):
+    '''
+    Return whether dump streaming is used for this hook. (Spoiler: It isn't.)
+    '''
+    return False
+
+
+BORGMATIC_SNAPSHOT_PREFIX = 'borgmatic-'
+
+
+def dump_data_sources(
+    hook_config, config, log_prefix, borgmatic_runtime_directory, source_directories, dry_run
+):
+    '''
+    Given a ZFS configuration dict, a configuration dict, a log prefix, the borgmatic runtime
+    directory, the configured source directories, and whether this is a dry run, auto-detect and
+    snapshot any ZFS dataset mount points within the given source directories. Also update those
+    source directories, repacing dataset mount points with corresponding snapshot directories. Use
+    the log prefix in any log entries.
+
+    Return an empty sequence of subprocess.Popen instances, since there are no ongoing dump
+    processes.
+
+    If this is a dry run or ZFS isn't enabled, then don't actually snapshot anything.
+
+    '''
+    dry_run_label = ' (dry run; not actually dumping anything)' if dry_run else ''
+
+    logger.info(f'{log_prefix}: Snapshotting ZFS datasets{dry_run_label}')
+
+    # TODO: Check for ZFS enabled in config and skip accordingly.
+    # TODO: Check for Borg 1.4+ and error if Borg is too old (cuz we need the slashdot hack).
+    # TODO: Dry run.
+
+    # List ZFS datasets to get their mount points.
+    zfs_command = config.get('zfs_command', 'zfs')
+    list_command = (
+        zfs_command,
+        'list',
+        '-H',
+        '-t',
+        'filesystem',
+        '-o',
+        'name,mountpoint',
+    )
+    list_output = borgmatic.execute.execute_command_and_capture_output(list_command)
+    mount_point_to_dataset_name = {
+        mount_point: dataset_name
+        for line in list_output.splitlines()
+        for (dataset_name, mount_point) in (line.rstrip().split('\t'),)
+    }
+
+    # Find the intersection between those mount points and the configured borgmatic source
+    # directories, the idea being that these are the requested datasets to snapshot.
+    requested_mount_point_to_dataset_name = {
+        source_directory: dataset_name
+        for source_directory in source_directories
+        for dataset_name in (mount_point_to_dataset_name.get(source_directory),)
+        if dataset_name
+    }
+
+    # TODO: Also maybe support datasets with property torsion.org.borgmatic:backup even if not
+    # listed in source directories?
+
+    # Snapshot each dataset, rewriting source directories to use the snapshot paths.
+    snapshot_paths = []
+    snapshot_name = f'{BORGMATIC_SNAPSHOT_PREFIX}{os.getpid()}'
+
+    for mount_point, dataset_name in requested_mount_point_to_dataset_name.items():
+        full_snapshot_name = f'{dataset_name}@{snapshot_name}'
+        logger.debug(f'{log_prefix}: Creating ZFS snapshot {full_snapshot_name}')
+
+        borgmatic.execute.execute_command(
+            (
+                zfs_command,
+                'snapshot',
+                '-r',
+                full_snapshot_name,
+            ),
+            output_log_level=logging.DEBUG,
+        )
+
+        # Mount the snapshot into a particular named temporary directory so that the snapshot ends
+        # up in the Borg archive at the "original" dataset mount point path.
+        snapshot_path = os.path.join(
+            # TODO: Maybe factor out into normalize_runtime_directory() utility function.
+            *(
+                subdirectory
+                for subdirectory in borgmatic_runtime_directory.split(os.path.sep)
+                if subdirectory != '.'
+            ),
+            'zfs_snapshots',
+            '.',
+            mount_point.lstrip(os.path.sep),
+        )
+        logger.debug(f'{log_prefix}: Mounting ZFS snapshot {full_snapshot_name} at {snapshot_path}')
+
+        os.makedirs(snapshot_path, mode=0o700, exist_ok=True)
+
+        borgmatic.execute.execute_command(
+            (
+                config.get('mount_command', 'mount'),
+                '-t',
+                'zfs',
+                f'{dataset_name}@{snapshot_name}',
+                snapshot_path,
+            ),
+            output_log_level=logging.DEBUG,
+        )
+
+        source_directories.remove(mount_point)
+        source_directories.append(snapshot_path)
+
+    return []
+
+
+def remove_data_source_dumps(hook_config, config, log_prefix, borgmatic_runtime_directory, dry_run):
+    '''
+    Given a ZFS configuration dict, a configuration dict, a log prefix, the borgmatic runtime
+    directory, and whether this is a dry run, unmount and destroy any ZFS snapshots created by
+    borgmatic. Use the log prefix in any log entries. If this is a dry run or ZFS isn't enabled,
+    then don't actually remove anything.
+    '''
+    # TODO: Dry run.
+
+    # Unmount snapshots.
+    zfs_command = config.get('zfs_command', 'zfs')
+    list_datasets_command = (
+        zfs_command,
+        'list',
+        '-H',
+        '-o',
+        'name,mountpoint',
+    )
+    list_datasets_output = borgmatic.execute.execute_command_and_capture_output(
+        list_datasets_command
+    )
+
+    mount_points = tuple(
+        mount_point
+        for line in list_datasets_output.splitlines()
+        for (dataset_name, mount_point) in (line.rstrip().split('\t'),)
+    )
+    # FIXME: This doesn't necessarily find snapshot mounts from previous borgmatic runs, because
+    # borgmatic_runtime_directory could be in a tempfile-created directory that has a random name.
+    snapshots_directory = os.path.join(
+        *(
+            subdirectory
+            for subdirectory in borgmatic_runtime_directory.split(os.path.sep)
+            if subdirectory != '.'
+        ),
+        'zfs_snapshots',
+    )
+    logger.debug(f'{log_prefix}: Looking for snapshots in {snapshots_directory}')
+
+    if os.path.isdir(snapshots_directory):
+        for mount_point in mount_points:
+            snapshot_path = os.path.join(snapshots_directory, mount_point.lstrip(os.path.sep))
+            logger.debug(f'{log_prefix}: Unmounting ZFS snapshot at {snapshot_path}')
+            borgmatic.execute.execute_command(
+                (
+                    config.get('umount_command', 'umount'),
+                    snapshot_path,
+                ),
+                output_log_level=logging.DEBUG,
+            )
+
+    # Destroy snapshots.
+    list_snapshots_command = (
+        zfs_command,
+        'list',
+        '-H',
+        '-t',
+        'snapshot',
+        '-o',
+        'name',
+    )
+    list_snapshots_output = borgmatic.execute.execute_command_and_capture_output(
+        list_snapshots_command
+    )
+
+    for line in list_snapshots_output.splitlines():
+        full_snapshot_name = line.rstrip()
+        logger.debug(f'{log_prefix}: Destroying ZFS snapshot {full_snapshot_name}')
+
+        # Only destroy snapshots that borgmatic actually created!
+        if not full_snapshot_name.split('@')[-1].startswith(BORGMATIC_SNAPSHOT_PREFIX):
+            continue
+
+        borgmatic.execute.execute_command(
+            (
+                zfs_command,
+                'destroy',
+                '-r',
+                full_snapshot_name,
+            ),
+            output_log_level=logging.DEBUG,
+        )
+
+
+def make_data_source_dump_patterns(hook_config, config, log_prefix, name=None):  # pragma: no cover
+    '''
+    Restores aren't implemented, because stored files can be extracted directly with "extract".
+    '''
+    raise NotImplementedError()
+
+
+def restore_data_source_dump(
+    hook_config, config, log_prefix, data_source, dry_run, extract_process, connection_params
+):  # pragma: no cover
+    '''
+    Restores aren't implemented, because stored files can be extracted directly with "extract".
+    '''
+    raise NotImplementedError()