execute.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450
  1. import collections
  2. import enum
  3. import logging
  4. import select
  5. import subprocess
  6. import textwrap
  7. import borgmatic.logger
  8. logger = logging.getLogger(__name__)
  9. ERROR_OUTPUT_MAX_LINE_COUNT = 25
  10. BORG_ERROR_EXIT_CODE_START = 2
  11. BORG_ERROR_EXIT_CODE_END = 99
  12. class Exit_status(enum.Enum):
  13. STILL_RUNNING = 1
  14. SUCCESS = 2
  15. WARNING = 3
  16. ERROR = 4
  17. def interpret_exit_code(command, exit_code, borg_local_path=None, borg_exit_codes=None):
  18. '''
  19. Return an Exit_status value (e.g. SUCCESS, ERROR, or WARNING) based on interpreting the given
  20. exit code. If a Borg local path is given and matches the process' command, then interpret the
  21. exit code based on Borg's documented exit code semantics. And if Borg exit codes are given as a
  22. sequence of exit code configuration dicts, then take those configured preferences into account.
  23. '''
  24. if exit_code is None:
  25. return Exit_status.STILL_RUNNING
  26. if exit_code == 0:
  27. return Exit_status.SUCCESS
  28. if borg_local_path and command[0] == borg_local_path:
  29. # First try looking for the exit code in the borg_exit_codes configuration.
  30. for entry in borg_exit_codes or ():
  31. if entry.get('code') == exit_code:
  32. treat_as = entry.get('treat_as')
  33. if treat_as == 'error':
  34. logger.error(
  35. f'Treating exit code {exit_code} as an error, as per configuration'
  36. )
  37. return Exit_status.ERROR
  38. elif treat_as == 'warning':
  39. logger.warning(
  40. f'Treating exit code {exit_code} as a warning, as per configuration'
  41. )
  42. return Exit_status.WARNING
  43. # If the exit code doesn't have explicit configuration, then fall back to the default Borg
  44. # behavior.
  45. return (
  46. Exit_status.ERROR
  47. if (
  48. exit_code < 0
  49. or (
  50. exit_code >= BORG_ERROR_EXIT_CODE_START
  51. and exit_code <= BORG_ERROR_EXIT_CODE_END
  52. )
  53. )
  54. else Exit_status.WARNING
  55. )
  56. return Exit_status.ERROR
  57. def command_for_process(process):
  58. '''
  59. Given a process as an instance of subprocess.Popen, return the command string that was used to
  60. invoke it.
  61. '''
  62. return process.args if isinstance(process.args, str) else ' '.join(process.args)
  63. def output_buffer_for_process(process, exclude_stdouts):
  64. '''
  65. Given a process as an instance of subprocess.Popen and a sequence of stdouts to exclude, return
  66. either the process's stdout or stderr. The idea is that if stdout is excluded for a process, we
  67. still have stderr to log.
  68. '''
  69. return process.stderr if process.stdout in exclude_stdouts else process.stdout
  70. def append_last_lines(last_lines, captured_output, line, output_log_level):
  71. '''
  72. Given a rolling list of last lines, a list of captured output, a line to append, and an output
  73. log level, append the line to the last lines and (if necessary) the captured output. Then log
  74. the line at the requested output log level.
  75. '''
  76. last_lines.append(line)
  77. if len(last_lines) > ERROR_OUTPUT_MAX_LINE_COUNT:
  78. last_lines.pop(0)
  79. if output_log_level is None:
  80. captured_output.append(line)
  81. else:
  82. logger.log(output_log_level, line)
  83. def log_outputs(processes, exclude_stdouts, output_log_level, borg_local_path, borg_exit_codes):
  84. '''
  85. Given a sequence of subprocess.Popen() instances for multiple processes, log the output for each
  86. process with the requested log level. Additionally, raise a CalledProcessError if a process
  87. exits with an error (or a warning for exit code 1, if that process does not match the Borg local
  88. path).
  89. If output log level is None, then instead of logging, capture output for each process and return
  90. it as a dict from the process to its output. Use the given Borg local path and exit code
  91. configuration to decide what's an error and what's a warning.
  92. For simplicity, it's assumed that the output buffer for each process is its stdout. But if any
  93. stdouts are given to exclude, then for any matching processes, log from their stderr instead.
  94. Note that stdout for a process can be None if output is intentionally not captured. In which
  95. case it won't be logged.
  96. '''
  97. # Map from output buffer to sequence of last lines.
  98. buffer_last_lines = collections.defaultdict(list)
  99. process_for_output_buffer = {
  100. output_buffer_for_process(process, exclude_stdouts): process
  101. for process in processes
  102. if process.stdout or process.stderr
  103. }
  104. output_buffers = list(process_for_output_buffer.keys())
  105. captured_outputs = collections.defaultdict(list)
  106. still_running = True
  107. # Log output for each process until they all exit.
  108. while True:
  109. if output_buffers:
  110. (ready_buffers, _, _) = select.select(output_buffers, [], [])
  111. for ready_buffer in ready_buffers:
  112. ready_process = process_for_output_buffer.get(ready_buffer)
  113. # The "ready" process has exited, but it might be a pipe destination with other
  114. # processes (pipe sources) waiting to be read from. So as a measure to prevent
  115. # hangs, vent all processes when one exits.
  116. if ready_process and ready_process.poll() is not None:
  117. for other_process in processes:
  118. if (
  119. other_process.poll() is None
  120. and other_process.stdout
  121. and other_process.stdout not in output_buffers
  122. ):
  123. # Add the process's output to output_buffers to ensure it'll get read.
  124. output_buffers.append(other_process.stdout)
  125. while True:
  126. line = ready_buffer.readline().rstrip().decode()
  127. if not line or not ready_process:
  128. break
  129. # Keep the last few lines of output in case the process errors, and we need the output for
  130. # the exception below.
  131. append_last_lines(
  132. buffer_last_lines[ready_buffer],
  133. captured_outputs[ready_process],
  134. line,
  135. output_log_level,
  136. )
  137. if not still_running:
  138. break
  139. still_running = False
  140. for process in processes:
  141. exit_code = process.poll() if output_buffers else process.wait()
  142. if exit_code is None:
  143. still_running = True
  144. command = process.args.split(' ') if isinstance(process.args, str) else process.args
  145. continue
  146. command = process.args.split(' ') if isinstance(process.args, str) else process.args
  147. exit_status = interpret_exit_code(command, exit_code, borg_local_path, borg_exit_codes)
  148. if exit_status in (Exit_status.ERROR, Exit_status.WARNING):
  149. # If an error occurs, include its output in the raised exception so that we don't
  150. # inadvertently hide error output.
  151. output_buffer = output_buffer_for_process(process, exclude_stdouts)
  152. last_lines = buffer_last_lines[output_buffer] if output_buffer else []
  153. # Collect any straggling output lines that came in since we last gathered output.
  154. while output_buffer: # pragma: no cover
  155. line = output_buffer.readline().rstrip().decode()
  156. if not line:
  157. break
  158. append_last_lines(
  159. last_lines, captured_outputs[process], line, output_log_level=logging.ERROR
  160. )
  161. if len(last_lines) == ERROR_OUTPUT_MAX_LINE_COUNT:
  162. last_lines.insert(0, '...')
  163. # Something has gone wrong. So vent each process' output buffer to prevent it from
  164. # hanging. And then kill the process.
  165. for other_process in processes:
  166. if other_process.poll() is None:
  167. other_process.stdout.read(0)
  168. other_process.kill()
  169. if exit_status == Exit_status.ERROR:
  170. raise subprocess.CalledProcessError(
  171. exit_code, command_for_process(process), '\n'.join(last_lines)
  172. )
  173. still_running = False
  174. break
  175. if captured_outputs:
  176. return {
  177. process: '\n'.join(output_lines) for process, output_lines in captured_outputs.items()
  178. }
  179. SECRET_COMMAND_FLAG_NAMES = {'--password'}
  180. def mask_command_secrets(full_command):
  181. '''
  182. Given a command as a sequence, mask secret values for flags like "--password" in preparation for
  183. logging.
  184. '''
  185. masked_command = []
  186. previous_piece = None
  187. for piece in full_command:
  188. masked_command.append('***' if previous_piece in SECRET_COMMAND_FLAG_NAMES else piece)
  189. previous_piece = piece
  190. return tuple(masked_command)
  191. MAX_LOGGED_COMMAND_LENGTH = 1000
  192. PREFIXES_OF_ENVIRONMENT_VARIABLES_TO_LOG = ('BORG_', 'PG', 'MARIADB_', 'MYSQL_')
  193. def log_command(full_command, input_file=None, output_file=None, environment=None):
  194. '''
  195. Log the given command (a sequence of command/argument strings), along with its input/output file
  196. paths and extra environment variables (with omitted values in case they contain passwords).
  197. '''
  198. logger.debug(
  199. textwrap.shorten(
  200. ' '.join(
  201. tuple(
  202. f'{key}=***'
  203. for key in (environment or {}).keys()
  204. if any(
  205. key.startswith(prefix)
  206. for prefix in PREFIXES_OF_ENVIRONMENT_VARIABLES_TO_LOG
  207. )
  208. )
  209. + mask_command_secrets(full_command)
  210. ),
  211. width=MAX_LOGGED_COMMAND_LENGTH,
  212. placeholder=' ...',
  213. )
  214. + (f" < {getattr(input_file, 'name', input_file)}" if input_file else '')
  215. + (f" > {getattr(output_file, 'name', output_file)}" if output_file else '')
  216. )
  217. # A sentinel passed as an output file to execute_command() to indicate that the command's output
  218. # should be allowed to flow through to stdout without being captured for logging. Useful for
  219. # commands with interactive prompts or those that mess directly with the console.
  220. DO_NOT_CAPTURE = object()
  221. def execute_command(
  222. full_command,
  223. output_log_level=logging.INFO,
  224. output_file=None,
  225. input_file=None,
  226. shell=False,
  227. environment=None,
  228. working_directory=None,
  229. borg_local_path=None,
  230. borg_exit_codes=None,
  231. run_to_completion=True,
  232. ):
  233. '''
  234. Execute the given command (a sequence of command/argument strings) and log its output at the
  235. given log level. If an open output file object is given, then write stdout to the file and only
  236. log stderr. If an open input file object is given, then read stdin from the file. If shell is
  237. True, execute the command within a shell. If an environment variables dict is given, then pass
  238. it into the command. If a working directory is given, use that as the present working directory
  239. when running the command. If a Borg local path is given, and the command matches it (regardless
  240. of arguments), treat exit code 1 as a warning instead of an error. But if Borg exit codes are
  241. given as a sequence of exit code configuration dicts, then use that configuration to decide
  242. what's an error and what's a warning. If run to completion is False, then return the process for
  243. the command without executing it to completion.
  244. Raise subprocesses.CalledProcessError if an error occurs while running the command.
  245. '''
  246. log_command(full_command, input_file, output_file, environment)
  247. do_not_capture = bool(output_file is DO_NOT_CAPTURE)
  248. command = ' '.join(full_command) if shell else full_command
  249. process = subprocess.Popen(
  250. command,
  251. stdin=input_file,
  252. stdout=None if do_not_capture else (output_file or subprocess.PIPE),
  253. stderr=None if do_not_capture else (subprocess.PIPE if output_file else subprocess.STDOUT),
  254. shell=shell,
  255. env=environment,
  256. cwd=working_directory,
  257. # Necessary for passing credentials via anonymous pipe.
  258. close_fds=False,
  259. )
  260. if not run_to_completion:
  261. return process
  262. with borgmatic.logger.Log_prefix(None): # Log command output without any prefix.
  263. log_outputs(
  264. (process,),
  265. (input_file, output_file),
  266. output_log_level,
  267. borg_local_path,
  268. borg_exit_codes,
  269. )
  270. def execute_command_and_capture_output(
  271. full_command,
  272. input_file=None,
  273. capture_stderr=False,
  274. shell=False,
  275. environment=None,
  276. working_directory=None,
  277. borg_local_path=None,
  278. borg_exit_codes=None,
  279. ):
  280. '''
  281. Execute the given command (a sequence of command/argument strings), capturing and returning its
  282. output (stdout). If an input file descriptor is given, then pipe it to the command's stdin. If
  283. capture stderr is True, then capture and return stderr in addition to stdout. If shell is True,
  284. execute the command within a shell. If an environment variables dict is given, then pass it into
  285. the command. If a working directory is given, use that as the present working directory when
  286. running the command. If a Borg local path is given, and the command matches it (regardless of
  287. arguments), treat exit code 1 as a warning instead of an error. But if Borg exit codes are given
  288. as a sequence of exit code configuration dicts, then use that configuration to decide what's an
  289. error and what's a warning.
  290. Raise subprocesses.CalledProcessError if an error occurs while running the command.
  291. '''
  292. log_command(full_command, input_file, environment=environment)
  293. command = ' '.join(full_command) if shell else full_command
  294. try:
  295. output = subprocess.check_output(
  296. command,
  297. stdin=input_file,
  298. stderr=subprocess.STDOUT if capture_stderr else None,
  299. shell=shell,
  300. env=environment,
  301. cwd=working_directory,
  302. # Necessary for passing credentials via anonymous pipe.
  303. close_fds=False,
  304. )
  305. except subprocess.CalledProcessError as error:
  306. if (
  307. interpret_exit_code(command, error.returncode, borg_local_path, borg_exit_codes)
  308. == Exit_status.ERROR
  309. ):
  310. raise
  311. output = error.output
  312. return output.decode() if output is not None else None
  313. def execute_command_with_processes(
  314. full_command,
  315. processes,
  316. output_log_level=logging.INFO,
  317. output_file=None,
  318. input_file=None,
  319. shell=False,
  320. environment=None,
  321. working_directory=None,
  322. borg_local_path=None,
  323. borg_exit_codes=None,
  324. ):
  325. '''
  326. Execute the given command (a sequence of command/argument strings) and log its output at the
  327. given log level. Simultaneously, continue to poll one or more active processes so that they
  328. run as well. This is useful, for instance, for processes that are streaming output to a named
  329. pipe that the given command is consuming from.
  330. If an open output file object is given, then write stdout to the file and only log stderr. But
  331. if output log level is None, instead suppress logging and return the captured output for (only)
  332. the given command. If an open input file object is given, then read stdin from the file. If
  333. shell is True, execute the command within a shell. If an environment variables dict is given,
  334. then pass it into the command. If a working directory is given, use that as the present working
  335. directory when running the command. If a Borg local path is given, then for any matching command
  336. or process (regardless of arguments), treat exit code 1 as a warning instead of an error. But if
  337. Borg exit codes are given as a sequence of exit code configuration dicts, then use that
  338. configuration to decide what's an error and what's a warning.
  339. Raise subprocesses.CalledProcessError if an error occurs while running the command or in the
  340. upstream process.
  341. '''
  342. log_command(full_command, input_file, output_file, environment)
  343. do_not_capture = bool(output_file is DO_NOT_CAPTURE)
  344. command = ' '.join(full_command) if shell else full_command
  345. try:
  346. command_process = subprocess.Popen(
  347. command,
  348. stdin=input_file,
  349. stdout=None if do_not_capture else (output_file or subprocess.PIPE),
  350. stderr=(
  351. None if do_not_capture else (subprocess.PIPE if output_file else subprocess.STDOUT)
  352. ),
  353. shell=shell,
  354. env=environment,
  355. cwd=working_directory,
  356. # Necessary for passing credentials via anonymous pipe.
  357. close_fds=False,
  358. )
  359. except (subprocess.CalledProcessError, OSError):
  360. # Something has gone wrong. So vent each process' output buffer to prevent it from hanging.
  361. # And then kill the process.
  362. for process in processes:
  363. if process.poll() is None:
  364. process.stdout.read(0)
  365. process.kill()
  366. raise
  367. with borgmatic.logger.Log_prefix(None): # Log command output without any prefix.
  368. captured_outputs = log_outputs(
  369. tuple(processes) + (command_process,),
  370. (input_file, output_file),
  371. output_log_level,
  372. borg_local_path,
  373. borg_exit_codes,
  374. )
  375. if output_log_level is None:
  376. return captured_outputs.get(command_process)