Преглед на файлове

Retry failing backups (#28, #432).

Reviewed-on: https://projects.torsion.org/borgmatic-collective/borgmatic/pulls/432
Dan Helfman преди 3 години
родител
ревизия
180018fd81
променени са 3 файла, в които са добавени 188 реда и са изтрити 4 реда
  1. 21 4
      borgmatic/commands/borgmatic.py
  2. 12 0
      borgmatic/config/schema.yaml
  3. 155 0
      tests/unit/commands/test_borgmatic.py

+ 21 - 4
borgmatic/commands/borgmatic.py

@@ -4,6 +4,8 @@ import json
 import logging
 import os
 import sys
+import time
+from queue import Queue
 from subprocess import CalledProcessError
 
 import colorama
@@ -52,6 +54,8 @@ def run_configuration(config_filename, config, arguments):
 
     local_path = location.get('local_path', 'borg')
     remote_path = location.get('remote_path')
+    retries = storage.get('retries', 0)
+    retry_timeout = storage.get('retry_timeout', 0)
     borg_environment.initialize(storage)
     encountered_error = None
     error_repository = ''
@@ -120,7 +124,16 @@ def run_configuration(config_filename, config, arguments):
         )
 
     if not encountered_error:
-        for repository_path in location['repositories']:
+        repo_queue = Queue()
+        for repo in location['repositories']:
+            repo_queue.put((repo, 0),)
+
+        while not repo_queue.empty():
+            repository_path, retry_num = repo_queue.get()
+            timeout = retry_num * retry_timeout
+            if timeout:
+                logger.warning(f'Sleeping {timeout}s before next retry')
+                time.sleep(timeout)
             try:
                 yield from run_actions(
                     arguments=arguments,
@@ -134,11 +147,15 @@ def run_configuration(config_filename, config, arguments):
                     repository_path=repository_path,
                 )
             except (OSError, CalledProcessError, ValueError) as error:
-                encountered_error = error
-                error_repository = repository_path
                 yield from make_error_log_records(
                     '{}: Error running actions for repository'.format(repository_path), error
                 )
+                if retry_num < retries:
+                    repo_queue.put((repository_path, retry_num + 1),)
+                    logger.warning(f'Retrying.. attempt {retry_num + 1}/{retries}')
+                    continue
+                encountered_error = error
+                error_repository = repository_path
 
     if not encountered_error:
         try:
@@ -257,7 +274,7 @@ def run_actions(
     hooks,
     local_path,
     remote_path,
-    repository_path
+    repository_path,
 ):  # pragma: no cover
     '''
     Given parsed command-line arguments as an argparse.ArgumentParser instance, several different

+ 12 - 0
borgmatic/config/schema.yaml

@@ -251,6 +251,18 @@ properties:
                     Remote network upload rate limit in kiBytes/second. Defaults
                     to unlimited.
                 example: 100
+            retries:
+                type: integer
+                description: |
+                    Number of times to retry a backup before failing. Defaults
+                    to 0 (i.e. does not attempt retry).
+                example: 3
+            retry_timeout:
+                type: integer
+                description: |
+                    Wait time between retries, to allow transient issues to pass
+                    Defaults to 0s.
+                example: 10
             temporary_directory:
                 type: string
                 description: |

+ 155 - 0
tests/unit/commands/test_borgmatic.py

@@ -1,5 +1,6 @@
 import logging
 import subprocess
+import time
 
 from flexmock import flexmock
 
@@ -184,6 +185,160 @@ def test_run_configuration_bails_for_on_error_hook_soft_failure():
     assert results == expected_results
 
 
+def test_run_retries_soft_error():
+    # Run action first fails, second passes
+    flexmock(module.borg_environment).should_receive('initialize')
+    flexmock(module.command).should_receive('execute_hook')
+    flexmock(module).should_receive('run_actions').and_raise(OSError).and_return([])
+    expected_results = [flexmock()]
+    flexmock(module).should_receive('make_error_log_records').and_return(expected_results).once()
+    config = {'location': {'repositories': ['foo']}, 'storage': {'retries': 1}}
+    arguments = {'global': flexmock(monitoring_verbosity=1, dry_run=False), 'create': flexmock()}
+    results = list(module.run_configuration('test.yaml', config, arguments))
+    assert results == expected_results
+
+
+def test_run_retries_hard_error():
+    # Run action fails twice
+    flexmock(module.borg_environment).should_receive('initialize')
+    flexmock(module.command).should_receive('execute_hook')
+    flexmock(module).should_receive('run_actions').and_raise(OSError).times(2)
+    expected_results = [flexmock(), flexmock()]
+    flexmock(module).should_receive('make_error_log_records').with_args(
+        'foo: Error running actions for repository', OSError
+    ).and_return(expected_results[:1]).with_args(
+        'foo: Error running actions for repository', OSError
+    ).and_return(
+        expected_results[1:]
+    ).twice()
+    config = {'location': {'repositories': ['foo']}, 'storage': {'retries': 1}}
+    arguments = {'global': flexmock(monitoring_verbosity=1, dry_run=False), 'create': flexmock()}
+    results = list(module.run_configuration('test.yaml', config, arguments))
+    assert results == expected_results
+
+
+def test_run_repos_ordered():
+    flexmock(module.borg_environment).should_receive('initialize')
+    flexmock(module.command).should_receive('execute_hook')
+    flexmock(module).should_receive('run_actions').and_raise(OSError).times(2)
+    expected_results = [flexmock(), flexmock()]
+    flexmock(module).should_receive('make_error_log_records').with_args(
+        'foo: Error running actions for repository', OSError
+    ).and_return(expected_results[:1]).ordered()
+    flexmock(module).should_receive('make_error_log_records').with_args(
+        'bar: Error running actions for repository', OSError
+    ).and_return(expected_results[1:]).ordered()
+    config = {'location': {'repositories': ['foo', 'bar']}}
+    arguments = {'global': flexmock(monitoring_verbosity=1, dry_run=False), 'create': flexmock()}
+    results = list(module.run_configuration('test.yaml', config, arguments))
+    assert results == expected_results
+
+
+def test_run_retries_round_robbin():
+    flexmock(module.borg_environment).should_receive('initialize')
+    flexmock(module.command).should_receive('execute_hook')
+    flexmock(module).should_receive('run_actions').and_raise(OSError).times(4)
+    expected_results = [flexmock(), flexmock(), flexmock(), flexmock()]
+    flexmock(module).should_receive('make_error_log_records').with_args(
+        'foo: Error running actions for repository', OSError
+    ).and_return(expected_results[0:1]).ordered()
+    flexmock(module).should_receive('make_error_log_records').with_args(
+        'bar: Error running actions for repository', OSError
+    ).and_return(expected_results[1:2]).ordered()
+    flexmock(module).should_receive('make_error_log_records').with_args(
+        'foo: Error running actions for repository', OSError
+    ).and_return(expected_results[2:3]).ordered()
+    flexmock(module).should_receive('make_error_log_records').with_args(
+        'bar: Error running actions for repository', OSError
+    ).and_return(expected_results[3:4]).ordered()
+    config = {'location': {'repositories': ['foo', 'bar']}, 'storage': {'retries': 1}}
+    arguments = {'global': flexmock(monitoring_verbosity=1, dry_run=False), 'create': flexmock()}
+    results = list(module.run_configuration('test.yaml', config, arguments))
+    assert results == expected_results
+
+
+def test_run_retries_one_passes():
+    flexmock(module.borg_environment).should_receive('initialize')
+    flexmock(module.command).should_receive('execute_hook')
+    flexmock(module).should_receive('run_actions').and_raise(OSError).and_raise(OSError).and_return(
+        []
+    ).and_raise(OSError).times(4)
+    expected_results = [flexmock(), flexmock(), flexmock()]
+    flexmock(module).should_receive('make_error_log_records').with_args(
+        'foo: Error running actions for repository', OSError
+    ).and_return(expected_results[0:1]).ordered()
+    flexmock(module).should_receive('make_error_log_records').with_args(
+        'bar: Error running actions for repository', OSError
+    ).and_return(expected_results[1:2]).ordered()
+    flexmock(module).should_receive('make_error_log_records').with_args(
+        'bar: Error running actions for repository', OSError
+    ).and_return(expected_results[2:3]).ordered()
+    config = {'location': {'repositories': ['foo', 'bar']}, 'storage': {'retries': 1}}
+    arguments = {'global': flexmock(monitoring_verbosity=1, dry_run=False), 'create': flexmock()}
+    results = list(module.run_configuration('test.yaml', config, arguments))
+    assert results == expected_results
+
+
+def test_run_retry_timeout():
+    flexmock(module.borg_environment).should_receive('initialize')
+    flexmock(module.command).should_receive('execute_hook')
+    flexmock(module).should_receive('run_actions').and_raise(OSError).times(4)
+    expected_results = [flexmock(), flexmock(), flexmock(), flexmock()]
+    flexmock(module).should_receive('make_error_log_records').with_args(
+        'foo: Error running actions for repository', OSError
+    ).and_return(expected_results[0:1]).ordered()
+
+    flexmock(time).should_receive('sleep').with_args(10).and_return().ordered()
+    flexmock(module).should_receive('make_error_log_records').with_args(
+        'foo: Error running actions for repository', OSError
+    ).and_return(expected_results[1:2]).ordered()
+
+    flexmock(time).should_receive('sleep').with_args(20).and_return().ordered()
+    flexmock(module).should_receive('make_error_log_records').with_args(
+        'foo: Error running actions for repository', OSError
+    ).and_return(expected_results[2:3]).ordered()
+
+    flexmock(time).should_receive('sleep').with_args(30).and_return().ordered()
+    flexmock(module).should_receive('make_error_log_records').with_args(
+        'foo: Error running actions for repository', OSError
+    ).and_return(expected_results[3:4]).ordered()
+    config = {'location': {'repositories': ['foo']}, 'storage': {'retries': 3, 'retry_timeout': 10}}
+    arguments = {'global': flexmock(monitoring_verbosity=1, dry_run=False), 'create': flexmock()}
+    results = list(module.run_configuration('test.yaml', config, arguments))
+    assert results == expected_results
+
+
+def test_run_retries_timeout_multiple_repos():
+    flexmock(module.borg_environment).should_receive('initialize')
+    flexmock(module.command).should_receive('execute_hook')
+    flexmock(module).should_receive('run_actions').and_raise(OSError).and_raise(OSError).and_return(
+        []
+    ).and_raise(OSError).times(4)
+    expected_results = [flexmock(), flexmock(), flexmock()]
+    flexmock(module).should_receive('make_error_log_records').with_args(
+        'foo: Error running actions for repository', OSError
+    ).and_return(expected_results[0:1]).ordered()
+    flexmock(module).should_receive('make_error_log_records').with_args(
+        'bar: Error running actions for repository', OSError
+    ).and_return(expected_results[1:2]).ordered()
+
+    # Sleep before retrying foo (and passing)
+    flexmock(time).should_receive('sleep').with_args(10).and_return().ordered()
+
+    # Sleep before retrying bar (and failing)
+    flexmock(time).should_receive('sleep').with_args(10).and_return().ordered()
+    flexmock(module).should_receive('make_error_log_records').with_args(
+        'bar: Error running actions for repository', OSError
+    ).and_return(expected_results[2:3]).ordered()
+    config = {
+        'location': {'repositories': ['foo', 'bar']},
+        'storage': {'retries': 1, 'retry_timeout': 10},
+    }
+    arguments = {'global': flexmock(monitoring_verbosity=1, dry_run=False), 'create': flexmock()}
+    results = list(module.run_configuration('test.yaml', config, arguments))
+    assert results == expected_results
+
+
 def test_load_configurations_collects_parsed_configurations():
     configuration = flexmock()
     other_configuration = flexmock()