Explorar el Código

Add unit tests for new credential hooks.

Dan Helfman hace 4 meses
padre
commit
5dda9c8ee5

+ 2 - 5
borgmatic/hooks/credential/file.py

@@ -1,8 +1,5 @@
 import logging
 import os
-import shlex
-
-import borgmatic.execute
 
 logger = logging.getLogger(__name__)
 
@@ -18,7 +15,7 @@ def load_credential(hook_config, config, credential_parameters):
     try:
         (credential_path,) = credential_parameters
     except ValueError:
-        raise ValueError(f'Cannot load credential with invalid credential path: "{' '.join(credential_parameters)}"')
+        raise ValueError(f'Cannot load invalid credential: "{' '.join(credential_parameters)}"')
 
     try:
         with open(credential_path) as credential_file:
@@ -26,4 +23,4 @@ def load_credential(hook_config, config, credential_parameters):
     except (FileNotFoundError, OSError) as error:
         logger.warning(error)
 
-        raise ValueError(f'Cannot load credential "{credential_name}" from file: {error.filename}')
+        raise ValueError(f'Cannot load credential file: {error.filename}')

+ 6 - 3
borgmatic/hooks/credential/keepassxc.py

@@ -1,6 +1,5 @@
 import logging
 import os
-import shlex
 
 import borgmatic.execute
 
@@ -18,10 +17,14 @@ def load_credential(hook_config, config, credential_parameters):
     try:
         (database_path, attribute_name) = credential_parameters
     except ValueError:
-        raise ValueError(f'Cannot load credential with invalid KeePassXC database path and attribute name: "{' '.join(credential_parameters)}"')
+        raise ValueError(
+            f'Cannot load credential with invalid KeePassXC database path and attribute name: "{' '.join(credential_parameters)}"'
+        )
 
     if not os.path.exists(database_path):
-        raise ValueError(f'Cannot load credential because KeePassXC database path does not exist: {database_path}')
+        raise ValueError(
+            f'Cannot load credential because KeePassXC database path does not exist: {database_path}'
+        )
 
     return borgmatic.execute.execute_command_and_capture_output(
         (

+ 5 - 7
borgmatic/hooks/credential/parse.py

@@ -7,9 +7,7 @@ import borgmatic.hooks.dispatch
 IS_A_HOOK = False
 
 
-CREDENTIAL_PATTERN = re.compile(
-    r'\{credential( +(?P<contents>.*))?\}'
-)
+CREDENTIAL_PATTERN = re.compile(r'\{credential( +(?P<hook_and_parameters>.*))?\}')
 
 
 @functools.cache
@@ -31,12 +29,12 @@ def resolve_credential(value):
     if not matcher:
         return value
 
-    contents = matcher.group('contents')
-    
-    if not contents:
+    hook_and_parameters = matcher.group('hook_and_parameters')
+
+    if not hook_and_parameters:
         raise ValueError(f'Cannot load credential with invalid syntax "{value}"')
 
-    (hook_name, *credential_parameters) = shlex.split(contents)
+    (hook_name, *credential_parameters) = shlex.split(hook_and_parameters)
 
     if not credential_parameters:
         raise ValueError(f'Cannot load credential with invalid syntax "{value}"')

+ 3 - 1
borgmatic/hooks/credential/systemd.py

@@ -20,7 +20,9 @@ def load_credential(hook_config, config, credential_parameters):
     try:
         (credential_name,) = credential_parameters
     except ValueError:
-        raise ValueError(f'Cannot load invalid credential name: "{' '.join(credential_parameters)}"')
+        raise ValueError(
+            f'Cannot load invalid credential name: "{' '.join(credential_parameters)}"'
+        )
 
     credentials_directory = os.environ.get('CREDENTIALS_DIRECTORY')
 

+ 42 - 0
tests/unit/hooks/credential/test_container.py

@@ -0,0 +1,42 @@
+import io
+import sys
+
+import pytest
+from flexmock import flexmock
+
+from borgmatic.hooks.credential import container as module
+
+
+@pytest.mark.parametrize('credential_parameters', ((), ('foo', 'bar')))
+def test_load_credential_with_invalid_credential_parameters_raises(credential_parameters):
+    with pytest.raises(ValueError):
+        module.load_credential(
+            hook_config={}, config={}, credential_parameters=credential_parameters
+        )
+
+
+def test_load_credential_with_invalid_secret_name_raises():
+    with pytest.raises(ValueError):
+        module.load_credential(
+            hook_config={}, config={}, credential_parameters=('this is invalid',)
+        )
+
+
+def test_load_credential_reads_named_secret_from_file():
+    credential_stream = io.StringIO('password')
+    credential_stream.name = '/run/secrets/mysecret'
+    builtins = flexmock(sys.modules['builtins'])
+    builtins.should_receive('open').with_args('/run/secrets/mysecret').and_return(credential_stream)
+
+    assert (
+        module.load_credential(hook_config={}, config={}, credential_parameters=('mysecret',))
+        == 'password'
+    )
+
+
+def test_load_credential_with_file_not_found_error_raises():
+    builtins = flexmock(sys.modules['builtins'])
+    builtins.should_receive('open').with_args('/run/secrets/mysecret').and_raise(FileNotFoundError)
+
+    with pytest.raises(ValueError):
+        module.load_credential(hook_config={}, config={}, credential_parameters=('mysecret',))

+ 50 - 0
tests/unit/hooks/credential/test_file.py

@@ -0,0 +1,50 @@
+import io
+import sys
+
+import pytest
+from flexmock import flexmock
+
+from borgmatic.hooks.credential import file as module
+
+
+@pytest.mark.parametrize('credential_parameters', ((), ('foo', 'bar')))
+def test_load_credential_with_invalid_credential_parameters_raises(credential_parameters):
+    with pytest.raises(ValueError):
+        module.load_credential(
+            hook_config={}, config={}, credential_parameters=credential_parameters
+        )
+
+
+def test_load_credential_with_invalid_credential_name_raises():
+    with pytest.raises(ValueError):
+        module.load_credential(
+            hook_config={}, config={}, credential_parameters=('this is invalid',)
+        )
+
+
+def test_load_credential_reads_named_credential_from_file():
+    credential_stream = io.StringIO('password')
+    credential_stream.name = '/credentials/mycredential'
+    builtins = flexmock(sys.modules['builtins'])
+    builtins.should_receive('open').with_args('/credentials/mycredential').and_return(
+        credential_stream
+    )
+
+    assert (
+        module.load_credential(
+            hook_config={}, config={}, credential_parameters=('/credentials/mycredential',)
+        )
+        == 'password'
+    )
+
+
+def test_load_credential_with_file_not_found_error_raises():
+    builtins = flexmock(sys.modules['builtins'])
+    builtins.should_receive('open').with_args('/credentials/mycredential').and_raise(
+        FileNotFoundError
+    )
+
+    with pytest.raises(ValueError):
+        module.load_credential(
+            hook_config={}, config={}, credential_parameters=('/credentials/mycredential',)
+        )

+ 38 - 0
tests/unit/hooks/credential/test_keepassxc.py

@@ -0,0 +1,38 @@
+import pytest
+from flexmock import flexmock
+
+from borgmatic.hooks.credential import keepassxc as module
+
+
+@pytest.mark.parametrize('credential_parameters', ((), ('foo',), ('foo', 'bar', 'baz')))
+def test_load_credential_with_invalid_credential_parameters_raises(credential_parameters):
+    flexmock(module.borgmatic.execute).should_receive('execute_command_and_capture_output').never()
+
+    with pytest.raises(ValueError):
+        module.load_credential(
+            hook_config={}, config={}, credential_parameters=credential_parameters
+        )
+
+
+def test_load_credential_with_missing_database_raises():
+    flexmock(module.os.path).should_receive('exists').and_return(False)
+    flexmock(module.borgmatic.execute).should_receive('execute_command_and_capture_output').never()
+
+    with pytest.raises(ValueError):
+        module.load_credential(
+            hook_config={}, config={}, credential_parameters=('database.kdbx', 'mypassword')
+        )
+
+
+def test_load_credential_with_present_database_fetches_password_from_keepassxc():
+    flexmock(module.os.path).should_receive('exists').and_return(True)
+    flexmock(module.borgmatic.execute).should_receive(
+        'execute_command_and_capture_output'
+    ).and_return('password').once()
+
+    assert (
+        module.load_credential(
+            hook_config={}, config={}, credential_parameters=('database.kdbx', 'mypassword')
+        )
+        == 'password'
+    )

+ 13 - 1
tests/unit/hooks/credential/test_systemd.py

@@ -7,6 +7,16 @@ from flexmock import flexmock
 from borgmatic.hooks.credential import systemd as module
 
 
+@pytest.mark.parametrize('credential_parameters', ((), ('foo', 'bar')))
+def test_load_credential_with_invalid_credential_parameters_raises(credential_parameters):
+    flexmock(module.os.environ).should_receive('get').never()
+
+    with pytest.raises(ValueError):
+        module.load_credential(
+            hook_config={}, config={}, credential_parameters=credential_parameters
+        )
+
+
 def test_load_credential_without_credentials_directory_raises():
     flexmock(module.os.environ).should_receive('get').with_args('CREDENTIALS_DIRECTORY').and_return(
         None
@@ -22,7 +32,9 @@ def test_load_credential_with_invalid_credential_name_raises():
     )
 
     with pytest.raises(ValueError):
-        module.load_credential(hook_config={}, config={}, credential_parameters=('../../my!@#$credential',))
+        module.load_credential(
+            hook_config={}, config={}, credential_parameters=('../../my!@#$credential',)
+        )
 
 
 def test_load_credential_reads_named_credential_from_file():