execute.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340
  1. import collections
  2. import logging
  3. import os
  4. import select
  5. import subprocess
  6. logger = logging.getLogger(__name__)
  7. ERROR_OUTPUT_MAX_LINE_COUNT = 25
  8. BORG_ERROR_EXIT_CODE = 2
  9. def exit_code_indicates_error(command, exit_code, borg_local_path=None):
  10. '''
  11. Return True if the given exit code from running a command corresponds to an error. If a Borg
  12. local path is given and matches the process' command, then treat exit code 1 as a warning
  13. instead of an error.
  14. '''
  15. if exit_code is None:
  16. return False
  17. if borg_local_path and command[0] == borg_local_path:
  18. return bool(exit_code < 0 or exit_code >= BORG_ERROR_EXIT_CODE)
  19. return bool(exit_code != 0)
  20. def command_for_process(process):
  21. '''
  22. Given a process as an instance of subprocess.Popen, return the command string that was used to
  23. invoke it.
  24. '''
  25. return process.args if isinstance(process.args, str) else ' '.join(process.args)
  26. def output_buffer_for_process(process, exclude_stdouts):
  27. '''
  28. Given a process as an instance of subprocess.Popen and a sequence of stdouts to exclude, return
  29. either the process's stdout or stderr. The idea is that if stdout is excluded for a process, we
  30. still have stderr to log.
  31. '''
  32. return process.stderr if process.stdout in exclude_stdouts else process.stdout
  33. def append_last_lines(last_lines, captured_output, line, output_log_level):
  34. '''
  35. Given a rolling list of last lines, a list of captured output, a line to append, and an output
  36. log level, append the line to the last lines and (if necessary) the captured output. Then log
  37. the line at the requested output log level.
  38. '''
  39. last_lines.append(line)
  40. if len(last_lines) > ERROR_OUTPUT_MAX_LINE_COUNT:
  41. last_lines.pop(0)
  42. if output_log_level is None:
  43. captured_output.append(line)
  44. else:
  45. logger.log(output_log_level, line)
  46. def log_outputs(processes, exclude_stdouts, output_log_level, borg_local_path):
  47. '''
  48. Given a sequence of subprocess.Popen() instances for multiple processes, log the output for each
  49. process with the requested log level. Additionally, raise a CalledProcessError if a process
  50. exits with an error (or a warning for exit code 1, if that process does not match the Borg local
  51. path).
  52. If output log level is None, then instead of logging, capture output for each process and return
  53. it as a dict from the process to its output.
  54. For simplicity, it's assumed that the output buffer for each process is its stdout. But if any
  55. stdouts are given to exclude, then for any matching processes, log from their stderr instead.
  56. Note that stdout for a process can be None if output is intentionally not captured. In which
  57. case it won't be logged.
  58. '''
  59. # Map from output buffer to sequence of last lines.
  60. buffer_last_lines = collections.defaultdict(list)
  61. process_for_output_buffer = {
  62. output_buffer_for_process(process, exclude_stdouts): process
  63. for process in processes
  64. if process.stdout or process.stderr
  65. }
  66. output_buffers = list(process_for_output_buffer.keys())
  67. captured_outputs = collections.defaultdict(list)
  68. still_running = True
  69. # Log output for each process until they all exit.
  70. while True:
  71. if output_buffers:
  72. (ready_buffers, _, _) = select.select(output_buffers, [], [])
  73. for ready_buffer in ready_buffers:
  74. ready_process = process_for_output_buffer.get(ready_buffer)
  75. # The "ready" process has exited, but it might be a pipe destination with other
  76. # processes (pipe sources) waiting to be read from. So as a measure to prevent
  77. # hangs, vent all processes when one exits.
  78. if ready_process and ready_process.poll() is not None:
  79. for other_process in processes:
  80. if (
  81. other_process.poll() is None
  82. and other_process.stdout
  83. and other_process.stdout not in output_buffers
  84. ):
  85. # Add the process's output to output_buffers to ensure it'll get read.
  86. output_buffers.append(other_process.stdout)
  87. while True:
  88. line = ready_buffer.readline().rstrip().decode()
  89. if not line or not ready_process:
  90. break
  91. # Keep the last few lines of output in case the process errors, and we need the output for
  92. # the exception below.
  93. append_last_lines(
  94. buffer_last_lines[ready_buffer],
  95. captured_outputs[ready_process],
  96. line,
  97. output_log_level,
  98. )
  99. if not still_running:
  100. break
  101. still_running = False
  102. for process in processes:
  103. exit_code = process.poll() if output_buffers else process.wait()
  104. if exit_code is None:
  105. still_running = True
  106. command = process.args.split(' ') if isinstance(process.args, str) else process.args
  107. # If any process errors, then raise accordingly.
  108. if exit_code_indicates_error(command, exit_code, borg_local_path):
  109. # If an error occurs, include its output in the raised exception so that we don't
  110. # inadvertently hide error output.
  111. output_buffer = output_buffer_for_process(process, exclude_stdouts)
  112. last_lines = buffer_last_lines[output_buffer] if output_buffer else []
  113. # Collect any straggling output lines that came in since we last gathered output.
  114. while output_buffer: # pragma: no cover
  115. line = output_buffer.readline().rstrip().decode()
  116. if not line:
  117. break
  118. append_last_lines(
  119. last_lines, captured_outputs[process], line, output_log_level=logging.ERROR
  120. )
  121. if len(last_lines) == ERROR_OUTPUT_MAX_LINE_COUNT:
  122. last_lines.insert(0, '...')
  123. # Something has gone wrong. So vent each process' output buffer to prevent it from
  124. # hanging. And then kill the process.
  125. for other_process in processes:
  126. if other_process.poll() is None:
  127. other_process.stdout.read(0)
  128. other_process.kill()
  129. raise subprocess.CalledProcessError(
  130. exit_code, command_for_process(process), '\n'.join(last_lines)
  131. )
  132. if captured_outputs:
  133. return {
  134. process: '\n'.join(output_lines) for process, output_lines in captured_outputs.items()
  135. }
  136. def log_command(full_command, input_file=None, output_file=None):
  137. '''
  138. Log the given command (a sequence of command/argument strings), along with its input/output file
  139. paths.
  140. '''
  141. logger.debug(
  142. ' '.join(full_command)
  143. + (f" < {getattr(input_file, 'name', '')}" if input_file else '')
  144. + (f" > {getattr(output_file, 'name', '')}" if output_file else '')
  145. )
  146. # An sentinel passed as an output file to execute_command() to indicate that the command's output
  147. # should be allowed to flow through to stdout without being captured for logging. Useful for
  148. # commands with interactive prompts or those that mess directly with the console.
  149. DO_NOT_CAPTURE = object()
  150. def execute_command(
  151. full_command,
  152. output_log_level=logging.INFO,
  153. output_file=None,
  154. input_file=None,
  155. shell=False,
  156. extra_environment=None,
  157. working_directory=None,
  158. borg_local_path=None,
  159. run_to_completion=True,
  160. ):
  161. '''
  162. Execute the given command (a sequence of command/argument strings) and log its output at the
  163. given log level. If an open output file object is given, then write stdout to the file and only
  164. log stderr. If an open input file object is given, then read stdin from the file. If shell is
  165. True, execute the command within a shell. If an extra environment dict is given, then use it to
  166. augment the current environment, and pass the result into the command. If a working directory is
  167. given, use that as the present working directory when running the command. If a Borg local path
  168. is given, and the command matches it (regardless of arguments), treat exit code 1 as a warning
  169. instead of an error. If run to completion is False, then return the process for the command
  170. without executing it to completion.
  171. Raise subprocesses.CalledProcessError if an error occurs while running the command.
  172. '''
  173. log_command(full_command, input_file, output_file)
  174. environment = {**os.environ, **extra_environment} if extra_environment else None
  175. do_not_capture = bool(output_file is DO_NOT_CAPTURE)
  176. command = ' '.join(full_command) if shell else full_command
  177. process = subprocess.Popen(
  178. command,
  179. stdin=input_file,
  180. stdout=None if do_not_capture else (output_file or subprocess.PIPE),
  181. stderr=None if do_not_capture else (subprocess.PIPE if output_file else subprocess.STDOUT),
  182. shell=shell,
  183. env=environment,
  184. cwd=working_directory,
  185. )
  186. if not run_to_completion:
  187. return process
  188. log_outputs(
  189. (process,), (input_file, output_file), output_log_level, borg_local_path=borg_local_path
  190. )
  191. def execute_command_and_capture_output(
  192. full_command,
  193. capture_stderr=False,
  194. shell=False,
  195. extra_environment=None,
  196. working_directory=None,
  197. borg_local_path=None,
  198. ):
  199. '''
  200. Execute the given command (a sequence of command/argument strings), capturing and returning its
  201. output (stdout). If capture stderr is True, then capture and return stderr in addition to
  202. stdout. If shell is True, execute the command within a shell. If an extra environment dict is
  203. given, then use it to augment the current environment, and pass the result into the command. If
  204. a working directory is given, use that as the present working directory when running the
  205. command. If a Borg local path is given, and the command matches it (regardless of arguments),
  206. treat exit code 1 as a warning instead of an error.
  207. Raise subprocesses.CalledProcessError if an error occurs while running the command.
  208. '''
  209. log_command(full_command)
  210. environment = {**os.environ, **extra_environment} if extra_environment else None
  211. command = ' '.join(full_command) if shell else full_command
  212. try:
  213. output = subprocess.check_output(
  214. command,
  215. stderr=subprocess.STDOUT if capture_stderr else None,
  216. shell=shell,
  217. env=environment,
  218. cwd=working_directory,
  219. )
  220. except subprocess.CalledProcessError as error:
  221. if exit_code_indicates_error(command, error.returncode, borg_local_path):
  222. raise
  223. output = error.output
  224. return output.decode() if output is not None else None
  225. def execute_command_with_processes(
  226. full_command,
  227. processes,
  228. output_log_level=logging.INFO,
  229. output_file=None,
  230. input_file=None,
  231. shell=False,
  232. extra_environment=None,
  233. working_directory=None,
  234. borg_local_path=None,
  235. ):
  236. '''
  237. Execute the given command (a sequence of command/argument strings) and log its output at the
  238. given log level. Simultaneously, continue to poll one or more active processes so that they
  239. run as well. This is useful, for instance, for processes that are streaming output to a named
  240. pipe that the given command is consuming from.
  241. If an open output file object is given, then write stdout to the file and only log stderr. But
  242. if output log level is None, instead suppress logging and return the captured output for (only)
  243. the given command. If an open input file object is given, then read stdin from the file. If
  244. shell is True, execute the command within a shell. If an extra environment dict is given, then
  245. use it to augment the current environment, and pass the result into the command. If a working
  246. directory is given, use that as the present working directory when running the command. If a
  247. Borg local path is given, then for any matching command or process (regardless of arguments),
  248. treat exit code 1 as a warning instead of an error.
  249. Raise subprocesses.CalledProcessError if an error occurs while running the command or in the
  250. upstream process.
  251. '''
  252. log_command(full_command, input_file, output_file)
  253. environment = {**os.environ, **extra_environment} if extra_environment else None
  254. do_not_capture = bool(output_file is DO_NOT_CAPTURE)
  255. command = ' '.join(full_command) if shell else full_command
  256. try:
  257. command_process = subprocess.Popen(
  258. command,
  259. stdin=input_file,
  260. stdout=None if do_not_capture else (output_file or subprocess.PIPE),
  261. stderr=None
  262. if do_not_capture
  263. else (subprocess.PIPE if output_file else subprocess.STDOUT),
  264. shell=shell,
  265. env=environment,
  266. cwd=working_directory,
  267. )
  268. except (subprocess.CalledProcessError, OSError):
  269. # Something has gone wrong. So vent each process' output buffer to prevent it from hanging.
  270. # And then kill the process.
  271. for process in processes:
  272. if process.poll() is None:
  273. process.stdout.read(0)
  274. process.kill()
  275. raise
  276. captured_outputs = log_outputs(
  277. tuple(processes) + (command_process,),
  278. (input_file, output_file),
  279. output_log_level,
  280. borg_local_path=borg_local_path,
  281. )
  282. if output_log_level is None:
  283. return captured_outputs.get(command_process)