123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332 |
- import pytest
- from flexmock import flexmock
- from borgmatic.actions.config import bootstrap as module
- def test_make_bootstrap_config_uses_ssh_command_argument():
- ssh_command = flexmock()
- config = module.make_bootstrap_config(flexmock(ssh_command=ssh_command))
- assert config['ssh_command'] == ssh_command
- assert config['relocated_repo_access_is_ok']
- def test_get_config_paths_returns_list_of_config_paths():
- flexmock(module.borgmatic.config.paths).should_receive(
- 'get_borgmatic_source_directory'
- ).and_return('/source')
- flexmock(module).should_receive('make_bootstrap_config').and_return({})
- bootstrap_arguments = flexmock(
- repository='repo',
- archive='archive',
- ssh_command=None,
- local_path='borg7',
- remote_path='borg8',
- borgmatic_source_directory=None,
- user_runtime_directory=None,
- )
- global_arguments = flexmock(
- dry_run=False,
- )
- local_borg_version = flexmock()
- flexmock(module.borgmatic.config.paths).should_receive('Runtime_directory').and_return(
- flexmock()
- )
- extract_process = flexmock(
- stdout=flexmock(
- read=lambda: '{"config_paths": ["/borgmatic/config.yaml"]}',
- ),
- )
- flexmock(module.borgmatic.borg.extract).should_receive('extract_archive').and_return(
- extract_process
- )
- assert module.get_config_paths(
- 'archive', bootstrap_arguments, global_arguments, local_borg_version
- ) == ['/borgmatic/config.yaml']
- def test_get_config_paths_probes_for_manifest():
- flexmock(module.borgmatic.config.paths).should_receive(
- 'get_borgmatic_source_directory'
- ).and_return('/source')
- flexmock(module).should_receive('make_bootstrap_config').and_return({})
- bootstrap_arguments = flexmock(
- repository='repo',
- archive='archive',
- ssh_command=None,
- local_path='borg7',
- remote_path='borg8',
- borgmatic_source_directory=None,
- user_runtime_directory=None,
- )
- global_arguments = flexmock(
- dry_run=False,
- )
- local_borg_version = flexmock()
- borgmatic_runtime_directory = flexmock()
- flexmock(module.borgmatic.config.paths).should_receive('Runtime_directory').and_return(
- borgmatic_runtime_directory,
- )
- flexmock(module.os.path).should_receive('join').with_args(
- 'borgmatic', 'bootstrap', 'manifest.json'
- ).and_return(flexmock()).once()
- flexmock(module.os.path).should_receive('join').with_args(
- borgmatic_runtime_directory, 'bootstrap', 'manifest.json'
- ).and_return(flexmock()).once()
- flexmock(module.os.path).should_receive('join').with_args(
- '/source', 'bootstrap', 'manifest.json'
- ).and_return(flexmock()).once()
- manifest_missing_extract_process = flexmock(
- stdout=flexmock(read=lambda: None),
- )
- manifest_found_extract_process = flexmock(
- stdout=flexmock(
- read=lambda: '{"config_paths": ["/borgmatic/config.yaml"]}',
- ),
- )
- flexmock(module.borgmatic.borg.extract).should_receive('extract_archive').and_return(
- manifest_missing_extract_process
- ).and_return(manifest_missing_extract_process).and_return(manifest_found_extract_process)
- assert module.get_config_paths(
- 'archive', bootstrap_arguments, global_arguments, local_borg_version
- ) == ['/borgmatic/config.yaml']
- def test_get_config_paths_translates_ssh_command_argument_to_config():
- flexmock(module.borgmatic.config.paths).should_receive(
- 'get_borgmatic_source_directory'
- ).and_return('/source')
- config = flexmock()
- flexmock(module).should_receive('make_bootstrap_config').and_return(config)
- bootstrap_arguments = flexmock(
- repository='repo',
- archive='archive',
- ssh_command='ssh -i key',
- local_path='borg7',
- remote_path='borg8',
- borgmatic_source_directory=None,
- user_runtime_directory=None,
- )
- global_arguments = flexmock(
- dry_run=False,
- )
- local_borg_version = flexmock()
- flexmock(module.borgmatic.config.paths).should_receive('Runtime_directory').and_return(
- flexmock()
- )
- extract_process = flexmock(
- stdout=flexmock(
- read=lambda: '{"config_paths": ["/borgmatic/config.yaml"]}',
- ),
- )
- flexmock(module.borgmatic.borg.extract).should_receive('extract_archive').with_args(
- False,
- 'repo',
- 'archive',
- object,
- config,
- object,
- object,
- extract_to_stdout=True,
- local_path='borg7',
- remote_path='borg8',
- ).and_return(extract_process)
- assert module.get_config_paths(
- 'archive', bootstrap_arguments, global_arguments, local_borg_version
- ) == ['/borgmatic/config.yaml']
- def test_get_config_paths_with_missing_manifest_raises_value_error():
- flexmock(module.borgmatic.config.paths).should_receive(
- 'get_borgmatic_source_directory'
- ).and_return('/source')
- flexmock(module).should_receive('make_bootstrap_config').and_return({})
- bootstrap_arguments = flexmock(
- repository='repo',
- archive='archive',
- ssh_command=None,
- local_path='borg7',
- remote_path='borg7',
- borgmatic_source_directory=None,
- user_runtime_directory=None,
- )
- global_arguments = flexmock(
- dry_run=False,
- )
- local_borg_version = flexmock()
- flexmock(module.borgmatic.config.paths).should_receive('Runtime_directory').and_return(
- flexmock()
- )
- flexmock(module.os.path).should_receive('join').and_return(flexmock())
- extract_process = flexmock(stdout=flexmock(read=lambda: ''))
- flexmock(module.borgmatic.borg.extract).should_receive('extract_archive').and_return(
- extract_process
- )
- with pytest.raises(ValueError):
- module.get_config_paths(
- 'archive', bootstrap_arguments, global_arguments, local_borg_version
- )
- def test_get_config_paths_with_broken_json_raises_value_error():
- flexmock(module.borgmatic.config.paths).should_receive(
- 'get_borgmatic_source_directory'
- ).and_return('/source')
- flexmock(module).should_receive('make_bootstrap_config').and_return({})
- bootstrap_arguments = flexmock(
- repository='repo',
- archive='archive',
- ssh_command=None,
- local_path='borg7',
- remote_path='borg7',
- borgmatic_source_directory=None,
- user_runtime_directory=None,
- )
- global_arguments = flexmock(
- dry_run=False,
- )
- local_borg_version = flexmock()
- flexmock(module.borgmatic.config.paths).should_receive('Runtime_directory').and_return(
- flexmock()
- )
- extract_process = flexmock(
- stdout=flexmock(read=lambda: '{"config_paths": ["/oops'),
- )
- flexmock(module.borgmatic.borg.extract).should_receive('extract_archive').and_return(
- extract_process
- )
- with pytest.raises(ValueError):
- module.get_config_paths(
- 'archive', bootstrap_arguments, global_arguments, local_borg_version
- )
- def test_get_config_paths_with_json_missing_key_raises_value_error():
- flexmock(module.borgmatic.config.paths).should_receive(
- 'get_borgmatic_source_directory'
- ).and_return('/source')
- flexmock(module).should_receive('make_bootstrap_config').and_return({})
- bootstrap_arguments = flexmock(
- repository='repo',
- archive='archive',
- ssh_command=None,
- local_path='borg7',
- remote_path='borg7',
- borgmatic_source_directory=None,
- user_runtime_directory=None,
- )
- global_arguments = flexmock(
- dry_run=False,
- )
- local_borg_version = flexmock()
- flexmock(module.borgmatic.config.paths).should_receive('Runtime_directory').and_return(
- flexmock()
- )
- extract_process = flexmock(
- stdout=flexmock(read=lambda: '{}'),
- )
- flexmock(module.borgmatic.borg.extract).should_receive('extract_archive').and_return(
- extract_process
- )
- with pytest.raises(ValueError):
- module.get_config_paths(
- 'archive', bootstrap_arguments, global_arguments, local_borg_version
- )
- def test_run_bootstrap_does_not_raise():
- flexmock(module).should_receive('make_bootstrap_config').and_return({})
- flexmock(module).should_receive('get_config_paths').and_return(['/borgmatic/config.yaml'])
- bootstrap_arguments = flexmock(
- repository='repo',
- archive='archive',
- destination='dest',
- strip_components=1,
- progress=False,
- user_runtime_directory='/borgmatic',
- ssh_command=None,
- local_path='borg7',
- remote_path='borg8',
- )
- global_arguments = flexmock(
- dry_run=False,
- )
- local_borg_version = flexmock()
- flexmock(module.borgmatic.config.paths).should_receive('Runtime_directory').and_return(
- flexmock()
- )
- extract_process = flexmock(
- stdout=flexmock(
- read=lambda: '{"config_paths": ["borgmatic/config.yaml"]}',
- ),
- )
- flexmock(module.borgmatic.borg.extract).should_receive('extract_archive').and_return(
- extract_process
- ).once()
- flexmock(module.borgmatic.borg.repo_list).should_receive('resolve_archive_name').and_return(
- 'archive'
- )
- module.run_bootstrap(bootstrap_arguments, global_arguments, local_borg_version)
- def test_run_bootstrap_translates_ssh_command_argument_to_config():
- config = flexmock()
- flexmock(module).should_receive('make_bootstrap_config').and_return(config)
- flexmock(module).should_receive('get_config_paths').and_return(['/borgmatic/config.yaml'])
- bootstrap_arguments = flexmock(
- repository='repo',
- archive='archive',
- destination='dest',
- strip_components=1,
- progress=False,
- user_runtime_directory='/borgmatic',
- ssh_command='ssh -i key',
- local_path='borg7',
- remote_path='borg8',
- )
- global_arguments = flexmock(
- dry_run=False,
- )
- local_borg_version = flexmock()
- flexmock(module.borgmatic.config.paths).should_receive('Runtime_directory').and_return(
- flexmock()
- )
- extract_process = flexmock(
- stdout=flexmock(
- read=lambda: '{"config_paths": ["borgmatic/config.yaml"]}',
- ),
- )
- flexmock(module.borgmatic.borg.extract).should_receive('extract_archive').with_args(
- False,
- 'repo',
- 'archive',
- object,
- config,
- object,
- object,
- extract_to_stdout=False,
- destination_path='dest',
- strip_components=1,
- progress=False,
- local_path='borg7',
- remote_path='borg8',
- ).and_return(extract_process).once()
- flexmock(module.borgmatic.borg.repo_list).should_receive('resolve_archive_name').with_args(
- 'repo',
- 'archive',
- config,
- object,
- object,
- local_path='borg7',
- remote_path='borg8',
- ).and_return('archive')
- module.run_bootstrap(bootstrap_arguments, global_arguments, local_borg_version)
|