123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110 |
- import fnmatch
- import json
- import logging
- import os
- import shutil
- import borgmatic.actions.restore
- logger = logging.getLogger(__name__)
- IS_A_HOOK = False
- def make_data_source_dump_path(borgmatic_runtime_directory, data_source_hook_name):
- '''
- Given a borgmatic runtime directory and a data source hook name, construct a data source dump
- path.
- '''
- return os.path.join(borgmatic_runtime_directory, data_source_hook_name)
- def make_data_source_dump_filename(dump_path, name, hostname=None, port=None):
- '''
- Based on the given dump directory path, data source name, hostname, and port, return a filename
- to use for the data source dump. The hostname defaults to localhost.
- Raise ValueError if the data source name is invalid.
- '''
- if os.path.sep in name:
- raise ValueError(f'Invalid data source name {name}')
- return os.path.join(
- dump_path,
- (hostname or 'localhost') + ('' if port is None else f':{port}'),
- name,
- )
- def write_data_source_dumps_metadata(borgmatic_runtime_directory, hook_name, dumps_metadata):
- '''
- Given the borgmatic runtime directory, a data source hook name, and a sequence of
- borgmatic.actions.restore.Dump instances of dump metadata, write a metadata file describing all
- of those dumps. This metadata is being dumped so that it's available upon restore, e.g. to
- support the user selecting which data source(s) should be restored.
- '''
- dumps_metadata_path = os.path.join(borgmatic_runtime_directory, hook_name, 'dumps.json')
- try:
- with open( dumps_metadata_path, 'w') as metadata_file:
- json.dump([dump._asdict() for dump in dumps_metadata], metadata_file, sort_keys=True)
- except OSError as error:
- raise ValueError(f'Error writing to dumps metadata at {dumps_metadata_path}: {error}')
- def parse_data_source_dumps_metadata(dumps_json, dumps_metadata_path):
- '''
- Given a dumps metadata JSON string as extracted from an archive, parse it into a tuple of
- borgmatic.actions.restore.Dump instances and return them.
- '''
- try:
- return tuple(borgmatic.actions.restore.Dump(**dump) for dump in json.loads(dumps_json))
- except (json.JSONDecodeError, TypeError) as error:
- raise ValueError(
- f'Cannot read archive data source dumps metadata at {dumps_metadata_path} due to invalid JSON: {error}',
- )
- def create_parent_directory_for_dump(dump_path):
- '''
- Create a directory to contain the given dump path.
- '''
- os.makedirs(os.path.dirname(dump_path), mode=0o700, exist_ok=True)
- def create_named_pipe_for_dump(dump_path):
- '''
- Create a named pipe at the given dump path.
- '''
- create_parent_directory_for_dump(dump_path)
- os.mkfifo(dump_path, mode=0o600)
- def remove_data_source_dumps(dump_path, data_source_type_name, dry_run):
- '''
- Remove all data source dumps in the given dump directory path (including the directory itself).
- If this is a dry run, then don't actually remove anything.
- '''
- dry_run_label = ' (dry run; not actually removing anything)' if dry_run else ''
- logger.debug(f'Removing {data_source_type_name} data source dumps{dry_run_label}')
- if dry_run:
- return
- if os.path.exists(dump_path):
- shutil.rmtree(dump_path)
- def convert_glob_patterns_to_borg_pattern(patterns):
- '''
- Convert a sequence of shell glob patterns like "/etc/*", "/tmp/*" to the corresponding Borg
- regular expression archive pattern as a single string like "re:etc/.*|tmp/.*".
- '''
- # Remove the "\Z" generated by fnmatch.translate() because we don't want the pattern to match
- # only at the end of a path, as directory format dumps require extracting files with paths
- # longer than the pattern. E.g., a pattern of "borgmatic/*/foo_databases/test" should also match
- # paths like "borgmatic/*/foo_databases/test/toc.dat"
- return 're:' + '|'.join(
- fnmatch.translate(pattern.lstrip('/')).replace('\\Z', '') for pattern in patterns
- )
|