瀏覽代碼

Fix hang when restoring a PostgreSQL "tar" format database dump (#430).

Dan Helfman 4 年之前
父節點
當前提交
1709f57ff0
共有 3 個文件被更改,包括 50 次插入5 次删除
  1. 1 0
      NEWS
  2. 16 4
      borgmatic/execute.py
  3. 33 1
      tests/integration/test_execute.py

+ 1 - 0
NEWS

@@ -1,6 +1,7 @@
 1.5.16.dev0
  * #379: Suppress console output in sample crontab and systemd service files.
  * #407: Fix syslog logging on FreeBSD.
+ * #430: Fix hang when restoring a PostgreSQL "tar" format database dump.
  * Better error messages! Switch the library used for validating configuration files (from pykwalify
    to jsonschema).
  * Link borgmatic Ansible role from installation documentation:

+ 16 - 4
borgmatic/execute.py

@@ -59,11 +59,12 @@ def log_outputs(processes, exclude_stdouts, output_log_level, borg_local_path):
     '''
     # Map from output buffer to sequence of last lines.
     buffer_last_lines = collections.defaultdict(list)
-    output_buffers = [
-        output_buffer_for_process(process, exclude_stdouts)
+    process_for_output_buffer = {
+        output_buffer_for_process(process, exclude_stdouts): process
         for process in processes
         if process.stdout or process.stderr
-    ]
+    }
+    output_buffers = list(process_for_output_buffer.keys())
 
     # Log output for each process until they all exit.
     while True:
@@ -71,8 +72,19 @@ def log_outputs(processes, exclude_stdouts, output_log_level, borg_local_path):
             (ready_buffers, _, _) = select.select(output_buffers, [], [])
 
             for ready_buffer in ready_buffers:
+                ready_process = process_for_output_buffer.get(ready_buffer)
+
+                # The "ready" process has exited, but it might be a pipe destination with other
+                # processes (pipe sources) waiting to be read from. So as a measure to prevent
+                # hangs, vent all processes when one exits.
+                if ready_process and ready_process.poll() is not None:
+                    for other_process in processes:
+                        if other_process.poll() is None:
+                            # Add the process's output to output_buffers to ensure it'll get read.
+                            output_buffers.append(other_process.stdout)
+
                 line = ready_buffer.readline().rstrip().decode()
-                if not line:
+                if not line or not ready_process:
                     continue
 
                 # Keep the last few lines of output in case the process errors, and we need the output for

+ 33 - 1
tests/integration/test_execute.py

@@ -98,7 +98,7 @@ def test_log_outputs_kills_other_processes_when_one_errors():
         process, 2, 'borg'
     ).and_return(True)
     other_process = subprocess.Popen(
-        ['watch', 'true'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT
+        ['sleep', '2'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT
     )
     flexmock(module).should_receive('exit_code_indicates_error').with_args(
         other_process, None, 'borg'
@@ -123,6 +123,38 @@ def test_log_outputs_kills_other_processes_when_one_errors():
     assert error.value.output
 
 
+def test_log_outputs_vents_other_processes_when_one_exits():
+    '''
+    Execute a command to generate a longish random string and pipe it into another command that
+    exits quickly. The test is basically to ensure we don't hang forever waiting for the exited
+    process to read the pipe, and that the string-generating process eventually gets vented and
+    exits.
+    '''
+    flexmock(module.logger).should_receive('log')
+    flexmock(module).should_receive('command_for_process').and_return('grep')
+
+    process = subprocess.Popen(
+        ['shuf', '-zer', '-n10000', '{A..Z}'], stdout=subprocess.PIPE, stderr=subprocess.PIPE
+    )
+    other_process = subprocess.Popen(
+        ['true'], stdin=process.stdout, stdout=subprocess.PIPE, stderr=subprocess.STDOUT
+    )
+    flexmock(module).should_receive('output_buffer_for_process').with_args(
+        process, (process.stdout,)
+    ).and_return(process.stderr)
+    flexmock(module).should_receive('output_buffer_for_process').with_args(
+        other_process, (process.stdout,)
+    ).and_return(other_process.stdout)
+    flexmock(process.stdout).should_call('readline').once()
+
+    module.log_outputs(
+        (process, other_process),
+        exclude_stdouts=(process.stdout,),
+        output_log_level=logging.INFO,
+        borg_local_path='borg',
+    )
+
+
 def test_log_outputs_truncates_long_error_output():
     flexmock(module).ERROR_OUTPUT_MAX_LINE_COUNT = 0
     flexmock(module.logger).should_receive('log')