DockerApi.py 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487
  1. import psutil
  2. import sys
  3. import os
  4. import re
  5. import time
  6. import json
  7. import asyncio
  8. import platform
  9. from datetime import datetime
  10. from fastapi import FastAPI, Response, Request
  11. class DockerApi:
  12. def __init__(self, redis_client, sync_docker_client, async_docker_client, logger):
  13. self.redis_client = redis_client
  14. self.sync_docker_client = sync_docker_client
  15. self.async_docker_client = async_docker_client
  16. self.logger = logger
  17. self.host_stats_isUpdating = False
  18. self.containerIds_to_update = []
  19. # api call: container_post - post_action: stop
  20. def container_post__stop(self, request_json, **kwargs):
  21. if 'container_id' in kwargs:
  22. filters = {"id": kwargs['container_id']}
  23. elif 'container_name' in kwargs:
  24. filters = {"name": kwargs['container_name']}
  25. for container in self.sync_docker_client.containers.list(all=True, filters=filters):
  26. container.stop()
  27. res = { 'type': 'success', 'msg': 'command completed successfully'}
  28. return Response(content=json.dumps(res, indent=4), media_type="application/json")
  29. # api call: container_post - post_action: start
  30. def container_post__start(self, request_json, **kwargs):
  31. if 'container_id' in kwargs:
  32. filters = {"id": kwargs['container_id']}
  33. elif 'container_name' in kwargs:
  34. filters = {"name": kwargs['container_name']}
  35. for container in self.sync_docker_client.containers.list(all=True, filters=filters):
  36. container.start()
  37. res = { 'type': 'success', 'msg': 'command completed successfully'}
  38. return Response(content=json.dumps(res, indent=4), media_type="application/json")
  39. # api call: container_post - post_action: restart
  40. def container_post__restart(self, request_json, **kwargs):
  41. if 'container_id' in kwargs:
  42. filters = {"id": kwargs['container_id']}
  43. elif 'container_name' in kwargs:
  44. filters = {"name": kwargs['container_name']}
  45. for container in self.sync_docker_client.containers.list(all=True, filters=filters):
  46. container.restart()
  47. res = { 'type': 'success', 'msg': 'command completed successfully'}
  48. return Response(content=json.dumps(res, indent=4), media_type="application/json")
  49. # api call: container_post - post_action: top
  50. def container_post__top(self, request_json, **kwargs):
  51. if 'container_id' in kwargs:
  52. filters = {"id": kwargs['container_id']}
  53. elif 'container_name' in kwargs:
  54. filters = {"name": kwargs['container_name']}
  55. for container in self.sync_docker_client.containers.list(all=True, filters=filters):
  56. res = { 'type': 'success', 'msg': container.top()}
  57. return Response(content=json.dumps(res, indent=4), media_type="application/json")
  58. # api call: container_post - post_action: stats
  59. def container_post__stats(self, request_json, **kwargs):
  60. if 'container_id' in kwargs:
  61. filters = {"id": kwargs['container_id']}
  62. elif 'container_name' in kwargs:
  63. filters = {"name": kwargs['container_name']}
  64. for container in self.sync_docker_client.containers.list(all=True, filters=filters):
  65. for stat in container.stats(decode=True, stream=True):
  66. res = { 'type': 'success', 'msg': stat}
  67. return Response(content=json.dumps(res, indent=4), media_type="application/json")
  68. # api call: container_post - post_action: exec - cmd: mailq - task: delete
  69. def container_post__exec__mailq__delete(self, request_json, **kwargs):
  70. if 'container_id' in kwargs:
  71. filters = {"id": kwargs['container_id']}
  72. elif 'container_name' in kwargs:
  73. filters = {"name": kwargs['container_name']}
  74. if 'items' in request_json:
  75. r = re.compile("^[0-9a-fA-F]+$")
  76. filtered_qids = filter(r.match, request_json['items'])
  77. if filtered_qids:
  78. flagged_qids = ['-d %s' % i for i in filtered_qids]
  79. sanitized_string = str(' '.join(flagged_qids))
  80. for container in self.sync_docker_client.containers.list(filters=filters):
  81. postsuper_r = container.exec_run(["/bin/bash", "-c", "/usr/sbin/postsuper " + sanitized_string])
  82. return self.exec_run_handler('generic', postsuper_r)
  83. # api call: container_post - post_action: exec - cmd: mailq - task: hold
  84. def container_post__exec__mailq__hold(self, request_json, **kwargs):
  85. if 'container_id' in kwargs:
  86. filters = {"id": kwargs['container_id']}
  87. elif 'container_name' in kwargs:
  88. filters = {"name": kwargs['container_name']}
  89. if 'items' in request_json:
  90. r = re.compile("^[0-9a-fA-F]+$")
  91. filtered_qids = filter(r.match, request_json['items'])
  92. if filtered_qids:
  93. flagged_qids = ['-h %s' % i for i in filtered_qids]
  94. sanitized_string = str(' '.join(flagged_qids))
  95. for container in self.sync_docker_client.containers.list(filters=filters):
  96. postsuper_r = container.exec_run(["/bin/bash", "-c", "/usr/sbin/postsuper " + sanitized_string])
  97. return self.exec_run_handler('generic', postsuper_r)
  98. # api call: container_post - post_action: exec - cmd: mailq - task: cat
  99. def container_post__exec__mailq__cat(self, request_json, **kwargs):
  100. if 'container_id' in kwargs:
  101. filters = {"id": kwargs['container_id']}
  102. elif 'container_name' in kwargs:
  103. filters = {"name": kwargs['container_name']}
  104. if 'items' in request_json:
  105. r = re.compile("^[0-9a-fA-F]+$")
  106. filtered_qids = filter(r.match, request_json['items'])
  107. if filtered_qids:
  108. sanitized_string = str(' '.join(filtered_qids))
  109. for container in self.sync_docker_client.containers.list(filters=filters):
  110. postcat_return = container.exec_run(["/bin/bash", "-c", "/usr/sbin/postcat -q " + sanitized_string], user='postfix')
  111. if not postcat_return:
  112. postcat_return = 'err: invalid'
  113. return self.exec_run_handler('utf8_text_only', postcat_return)
  114. # api call: container_post - post_action: exec - cmd: mailq - task: unhold
  115. def container_post__exec__mailq__unhold(self, request_json, **kwargs):
  116. if 'container_id' in kwargs:
  117. filters = {"id": kwargs['container_id']}
  118. elif 'container_name' in kwargs:
  119. filters = {"name": kwargs['container_name']}
  120. if 'items' in request_json:
  121. r = re.compile("^[0-9a-fA-F]+$")
  122. filtered_qids = filter(r.match, request_json['items'])
  123. if filtered_qids:
  124. flagged_qids = ['-H %s' % i for i in filtered_qids]
  125. sanitized_string = str(' '.join(flagged_qids))
  126. for container in self.sync_docker_client.containers.list(filters=filters):
  127. postsuper_r = container.exec_run(["/bin/bash", "-c", "/usr/sbin/postsuper " + sanitized_string])
  128. return self.exec_run_handler('generic', postsuper_r)
  129. # api call: container_post - post_action: exec - cmd: mailq - task: deliver
  130. def container_post__exec__mailq__deliver(self, request_json, **kwargs):
  131. if 'container_id' in kwargs:
  132. filters = {"id": kwargs['container_id']}
  133. elif 'container_name' in kwargs:
  134. filters = {"name": kwargs['container_name']}
  135. if 'items' in request_json:
  136. r = re.compile("^[0-9a-fA-F]+$")
  137. filtered_qids = filter(r.match, request_json['items'])
  138. if filtered_qids:
  139. flagged_qids = ['-i %s' % i for i in filtered_qids]
  140. for container in self.sync_docker_client.containers.list(filters=filters):
  141. for i in flagged_qids:
  142. postqueue_r = container.exec_run(["/bin/bash", "-c", "/usr/sbin/postqueue " + i], user='postfix')
  143. # todo: check each exit code
  144. res = { 'type': 'success', 'msg': 'Scheduled immediate delivery'}
  145. return Response(content=json.dumps(res, indent=4), media_type="application/json")
  146. # api call: container_post - post_action: exec - cmd: mailq - task: list
  147. def container_post__exec__mailq__list(self, request_json, **kwargs):
  148. if 'container_id' in kwargs:
  149. filters = {"id": kwargs['container_id']}
  150. elif 'container_name' in kwargs:
  151. filters = {"name": kwargs['container_name']}
  152. for container in self.sync_docker_client.containers.list(filters=filters):
  153. mailq_return = container.exec_run(["/usr/sbin/postqueue", "-j"], user='postfix')
  154. return self.exec_run_handler('utf8_text_only', mailq_return)
  155. # api call: container_post - post_action: exec - cmd: mailq - task: flush
  156. def container_post__exec__mailq__flush(self, request_json, **kwargs):
  157. if 'container_id' in kwargs:
  158. filters = {"id": kwargs['container_id']}
  159. elif 'container_name' in kwargs:
  160. filters = {"name": kwargs['container_name']}
  161. for container in self.sync_docker_client.containers.list(filters=filters):
  162. postqueue_r = container.exec_run(["/usr/sbin/postqueue", "-f"], user='postfix')
  163. return self.exec_run_handler('generic', postqueue_r)
  164. # api call: container_post - post_action: exec - cmd: mailq - task: super_delete
  165. def container_post__exec__mailq__super_delete(self, request_json, **kwargs):
  166. if 'container_id' in kwargs:
  167. filters = {"id": kwargs['container_id']}
  168. elif 'container_name' in kwargs:
  169. filters = {"name": kwargs['container_name']}
  170. for container in self.sync_docker_client.containers.list(filters=filters):
  171. postsuper_r = container.exec_run(["/usr/sbin/postsuper", "-d", "ALL"])
  172. return self.exec_run_handler('generic', postsuper_r)
  173. # api call: container_post - post_action: exec - cmd: system - task: fts_rescan
  174. def container_post__exec__system__fts_rescan(self, request_json, **kwargs):
  175. if 'container_id' in kwargs:
  176. filters = {"id": kwargs['container_id']}
  177. elif 'container_name' in kwargs:
  178. filters = {"name": kwargs['container_name']}
  179. if 'username' in request_json:
  180. for container in self.sync_docker_client.containers.list(filters=filters):
  181. rescan_return = container.exec_run(["/bin/bash", "-c", "/usr/bin/doveadm fts rescan -u '" + request_json['username'].replace("'", "'\\''") + "'"], user='vmail')
  182. if rescan_return.exit_code == 0:
  183. res = { 'type': 'success', 'msg': 'fts_rescan: rescan triggered'}
  184. return Response(content=json.dumps(res, indent=4), media_type="application/json")
  185. else:
  186. res = { 'type': 'warning', 'msg': 'fts_rescan error'}
  187. return Response(content=json.dumps(res, indent=4), media_type="application/json")
  188. if 'all' in request_json:
  189. for container in self.sync_docker_client.containers.list(filters=filters):
  190. rescan_return = container.exec_run(["/bin/bash", "-c", "/usr/bin/doveadm fts rescan -A"], user='vmail')
  191. if rescan_return.exit_code == 0:
  192. res = { 'type': 'success', 'msg': 'fts_rescan: rescan triggered'}
  193. return Response(content=json.dumps(res, indent=4), media_type="application/json")
  194. else:
  195. res = { 'type': 'warning', 'msg': 'fts_rescan error'}
  196. return Response(content=json.dumps(res, indent=4), media_type="application/json")
  197. # api call: container_post - post_action: exec - cmd: system - task: df
  198. def container_post__exec__system__df(self, request_json, **kwargs):
  199. if 'container_id' in kwargs:
  200. filters = {"id": kwargs['container_id']}
  201. elif 'container_name' in kwargs:
  202. filters = {"name": kwargs['container_name']}
  203. if 'dir' in request_json:
  204. for container in self.sync_docker_client.containers.list(filters=filters):
  205. df_return = container.exec_run(["/bin/bash", "-c", "/bin/df -H '" + request_json['dir'].replace("'", "'\\''") + "' | /usr/bin/tail -n1 | /usr/bin/tr -s [:blank:] | /usr/bin/tr ' ' ','"], user='nobody')
  206. if df_return.exit_code == 0:
  207. return df_return.output.decode('utf-8').rstrip()
  208. else:
  209. return "0,0,0,0,0,0"
  210. # api call: container_post - post_action: exec - cmd: system - task: mysql_upgrade
  211. def container_post__exec__system__mysql_upgrade(self, request_json, **kwargs):
  212. if 'container_id' in kwargs:
  213. filters = {"id": kwargs['container_id']}
  214. elif 'container_name' in kwargs:
  215. filters = {"name": kwargs['container_name']}
  216. for container in self.sync_docker_client.containers.list(filters=filters):
  217. sql_return = container.exec_run(["/bin/bash", "-c", "/usr/bin/mysql_upgrade -uroot -p'" + os.environ['DBROOT'].replace("'", "'\\''") + "'\n"], user='mysql')
  218. if sql_return.exit_code == 0:
  219. matched = False
  220. for line in sql_return.output.decode('utf-8').split("\n"):
  221. if 'is already upgraded to' in line:
  222. matched = True
  223. if matched:
  224. res = { 'type': 'success', 'msg':'mysql_upgrade: already upgraded', 'text': sql_return.output.decode('utf-8')}
  225. return Response(content=json.dumps(res, indent=4), media_type="application/json")
  226. else:
  227. container.restart()
  228. res = { 'type': 'warning', 'msg':'mysql_upgrade: upgrade was applied', 'text': sql_return.output.decode('utf-8')}
  229. return Response(content=json.dumps(res, indent=4), media_type="application/json")
  230. else:
  231. res = { 'type': 'error', 'msg': 'mysql_upgrade: error running command', 'text': sql_return.output.decode('utf-8')}
  232. return Response(content=json.dumps(res, indent=4), media_type="application/json")
  233. # api call: container_post - post_action: exec - cmd: system - task: mysql_tzinfo_to_sql
  234. def container_post__exec__system__mysql_tzinfo_to_sql(self, request_json, **kwargs):
  235. if 'container_id' in kwargs:
  236. filters = {"id": kwargs['container_id']}
  237. elif 'container_name' in kwargs:
  238. filters = {"name": kwargs['container_name']}
  239. for container in self.sync_docker_client.containers.list(filters=filters):
  240. sql_return = container.exec_run(["/bin/bash", "-c", "/usr/bin/mysql_tzinfo_to_sql /usr/share/zoneinfo | /bin/sed 's/Local time zone must be set--see zic manual page/FCTY/' | /usr/bin/mysql -uroot -p'" + os.environ['DBROOT'].replace("'", "'\\''") + "' mysql \n"], user='mysql')
  241. if sql_return.exit_code == 0:
  242. res = { 'type': 'info', 'msg': 'mysql_tzinfo_to_sql: command completed successfully', 'text': sql_return.output.decode('utf-8')}
  243. return Response(content=json.dumps(res, indent=4), media_type="application/json")
  244. else:
  245. res = { 'type': 'error', 'msg': 'mysql_tzinfo_to_sql: error running command', 'text': sql_return.output.decode('utf-8')}
  246. return Response(content=json.dumps(res, indent=4), media_type="application/json")
  247. # api call: container_post - post_action: exec - cmd: reload - task: dovecot
  248. def container_post__exec__reload__dovecot(self, request_json, **kwargs):
  249. if 'container_id' in kwargs:
  250. filters = {"id": kwargs['container_id']}
  251. elif 'container_name' in kwargs:
  252. filters = {"name": kwargs['container_name']}
  253. for container in self.sync_docker_client.containers.list(filters=filters):
  254. reload_return = container.exec_run(["/bin/bash", "-c", "/usr/sbin/dovecot reload"])
  255. return self.exec_run_handler('generic', reload_return)
  256. # api call: container_post - post_action: exec - cmd: reload - task: postfix
  257. def container_post__exec__reload__postfix(self, request_json, **kwargs):
  258. if 'container_id' in kwargs:
  259. filters = {"id": kwargs['container_id']}
  260. elif 'container_name' in kwargs:
  261. filters = {"name": kwargs['container_name']}
  262. for container in self.sync_docker_client.containers.list(filters=filters):
  263. reload_return = container.exec_run(["/bin/bash", "-c", "/usr/sbin/postfix reload"])
  264. return self.exec_run_handler('generic', reload_return)
  265. # api call: container_post - post_action: exec - cmd: reload - task: nginx
  266. def container_post__exec__reload__nginx(self, request_json, **kwargs):
  267. if 'container_id' in kwargs:
  268. filters = {"id": kwargs['container_id']}
  269. elif 'container_name' in kwargs:
  270. filters = {"name": kwargs['container_name']}
  271. for container in self.sync_docker_client.containers.list(filters=filters):
  272. reload_return = container.exec_run(["/bin/sh", "-c", "/usr/sbin/nginx -s reload"])
  273. return self.exec_run_handler('generic', reload_return)
  274. # api call: container_post - post_action: exec - cmd: sieve - task: list
  275. def container_post__exec__sieve__list(self, request_json, **kwargs):
  276. if 'container_id' in kwargs:
  277. filters = {"id": kwargs['container_id']}
  278. elif 'container_name' in kwargs:
  279. filters = {"name": kwargs['container_name']}
  280. if 'username' in request_json:
  281. for container in self.sync_docker_client.containers.list(filters=filters):
  282. sieve_return = container.exec_run(["/bin/bash", "-c", "/usr/bin/doveadm sieve list -u '" + request_json['username'].replace("'", "'\\''") + "'"])
  283. return self.exec_run_handler('utf8_text_only', sieve_return)
  284. # api call: container_post - post_action: exec - cmd: sieve - task: print
  285. def container_post__exec__sieve__print(self, request_json, **kwargs):
  286. if 'container_id' in kwargs:
  287. filters = {"id": kwargs['container_id']}
  288. elif 'container_name' in kwargs:
  289. filters = {"name": kwargs['container_name']}
  290. if 'username' in request_json and 'script_name' in request_json:
  291. for container in self.sync_docker_client.containers.list(filters=filters):
  292. cmd = ["/bin/bash", "-c", "/usr/bin/doveadm sieve get -u '" + request_json['username'].replace("'", "'\\''") + "' '" + request_json['script_name'].replace("'", "'\\''") + "'"]
  293. sieve_return = container.exec_run(cmd)
  294. return self.exec_run_handler('utf8_text_only', sieve_return)
  295. # api call: container_post - post_action: exec - cmd: maildir - task: cleanup
  296. def container_post__exec__maildir__cleanup(self, request_json, **kwargs):
  297. if 'container_id' in kwargs:
  298. filters = {"id": kwargs['container_id']}
  299. elif 'container_name' in kwargs:
  300. filters = {"name": kwargs['container_name']}
  301. if 'maildir' in request_json:
  302. for container in self.sync_docker_client.containers.list(filters=filters):
  303. sane_name = re.sub(r'\W+', '', request_json['maildir'])
  304. vmail_name = request_json['maildir'].replace("'", "'\\''")
  305. cmd_vmail = "if [[ -d '/var/vmail/" + vmail_name + "' ]]; then /bin/mv '/var/vmail/" + vmail_name + "' '/var/vmail/_garbage/" + str(int(time.time())) + "_" + sane_name + "'; fi"
  306. index_name = request_json['maildir'].split("/")
  307. if len(index_name) > 1:
  308. index_name = index_name[1].replace("'", "'\\''") + "@" + index_name[0].replace("'", "'\\''")
  309. cmd_vmail_index = "if [[ -d '/var/vmail_index/" + index_name + "' ]]; then /bin/mv '/var/vmail_index/" + index_name + "' '/var/vmail/_garbage/" + str(int(time.time())) + "_" + sane_name + "_index'; fi"
  310. cmd = ["/bin/bash", "-c", cmd_vmail + " && " + cmd_vmail_index]
  311. else:
  312. cmd = ["/bin/bash", "-c", cmd_vmail]
  313. maildir_cleanup = container.exec_run(cmd, user='vmail')
  314. return self.exec_run_handler('generic', maildir_cleanup)
  315. # api call: container_post - post_action: exec - cmd: rspamd - task: worker_password
  316. def container_post__exec__rspamd__worker_password(self, request_json, **kwargs):
  317. if 'container_id' in kwargs:
  318. filters = {"id": kwargs['container_id']}
  319. elif 'container_name' in kwargs:
  320. filters = {"name": kwargs['container_name']}
  321. if 'raw' in request_json:
  322. for container in self.sync_docker_client.containers.list(filters=filters):
  323. cmd = "/usr/bin/rspamadm pw -e -p '" + request_json['raw'].replace("'", "'\\''") + "' 2> /dev/null"
  324. cmd_response = self.exec_cmd_container(container, cmd, user="_rspamd")
  325. matched = False
  326. for line in cmd_response.split("\n"):
  327. if '$2$' in line:
  328. hash = line.strip()
  329. hash_out = re.search('\$2\$.+$', hash).group(0)
  330. rspamd_passphrase_hash = re.sub('[^0-9a-zA-Z\$]+', '', hash_out.rstrip())
  331. rspamd_password_filename = "/etc/rspamd/override.d/worker-controller-password.inc"
  332. cmd = '''/bin/echo 'enable_password = "%s";' > %s && cat %s''' % (rspamd_passphrase_hash, rspamd_password_filename, rspamd_password_filename)
  333. cmd_response = self.exec_cmd_container(container, cmd, user="_rspamd")
  334. if rspamd_passphrase_hash.startswith("$2$") and rspamd_passphrase_hash in cmd_response:
  335. container.restart()
  336. matched = True
  337. if matched:
  338. res = { 'type': 'success', 'msg': 'command completed successfully' }
  339. self.logger.info('success changing Rspamd password')
  340. return Response(content=json.dumps(res, indent=4), media_type="application/json")
  341. else:
  342. self.logger.error('failed changing Rspamd password')
  343. res = { 'type': 'danger', 'msg': 'command did not complete' }
  344. return Response(content=json.dumps(res, indent=4), media_type="application/json")
  345. # Collect host stats
  346. async def get_host_stats(self, wait=5):
  347. try:
  348. system_time = datetime.now()
  349. host_stats = {
  350. "cpu": {
  351. "cores": psutil.cpu_count(),
  352. "usage": psutil.cpu_percent()
  353. },
  354. "memory": {
  355. "total": psutil.virtual_memory().total,
  356. "usage": psutil.virtual_memory().percent,
  357. "swap": psutil.swap_memory()
  358. },
  359. "uptime": time.time() - psutil.boot_time(),
  360. "system_time": system_time.strftime("%d.%m.%Y %H:%M:%S"),
  361. "architecture": platform.machine()
  362. }
  363. await self.redis_client.set('host_stats', json.dumps(host_stats), ex=10)
  364. except Exception as e:
  365. res = {
  366. "type": "danger",
  367. "msg": str(e)
  368. }
  369. await asyncio.sleep(wait)
  370. self.host_stats_isUpdating = False
  371. # Collect container stats
  372. async def get_container_stats(self, container_id, wait=5, stop=False):
  373. if container_id and container_id.isalnum():
  374. try:
  375. for container in (await self.async_docker_client.containers.list()):
  376. if container._id == container_id:
  377. res = await container.stats(stream=False)
  378. if await self.redis_client.exists(container_id + '_stats'):
  379. stats = json.loads(await self.redis_client.get(container_id + '_stats'))
  380. else:
  381. stats = []
  382. stats.append(res[0])
  383. if len(stats) > 3:
  384. del stats[0]
  385. await self.redis_client.set(container_id + '_stats', json.dumps(stats), ex=60)
  386. except Exception as e:
  387. res = {
  388. "type": "danger",
  389. "msg": str(e)
  390. }
  391. else:
  392. res = {
  393. "type": "danger",
  394. "msg": "no or invalid id defined"
  395. }
  396. await asyncio.sleep(wait)
  397. if stop == True:
  398. # update task was called second time, stop
  399. self.containerIds_to_update.remove(container_id)
  400. else:
  401. # call update task a second time
  402. await self.get_container_stats(container_id, wait=0, stop=True)
  403. def exec_cmd_container(self, container, cmd, user, timeout=2, shell_cmd="/bin/bash"):
  404. def recv_socket_data(c_socket, timeout):
  405. c_socket.setblocking(0)
  406. total_data=[]
  407. data=''
  408. begin=time.time()
  409. while True:
  410. if total_data and time.time()-begin > timeout:
  411. break
  412. elif time.time()-begin > timeout*2:
  413. break
  414. try:
  415. data = c_socket.recv(8192)
  416. if data:
  417. total_data.append(data.decode('utf-8'))
  418. #change the beginning time for measurement
  419. begin=time.time()
  420. else:
  421. #sleep for sometime to indicate a gap
  422. time.sleep(0.1)
  423. break
  424. except:
  425. pass
  426. return ''.join(total_data)
  427. try :
  428. socket = container.exec_run([shell_cmd], stdin=True, socket=True, user=user).output._sock
  429. if not cmd.endswith("\n"):
  430. cmd = cmd + "\n"
  431. socket.send(cmd.encode('utf-8'))
  432. data = recv_socket_data(socket, timeout)
  433. socket.close()
  434. return data
  435. except Exception as e:
  436. self.logger.error("error - exec_cmd_container: %s" % str(e))
  437. traceback.print_exc(file=sys.stdout)
  438. def exec_run_handler(self, type, output):
  439. if type == 'generic':
  440. if output.exit_code == 0:
  441. res = { 'type': 'success', 'msg': 'command completed successfully' }
  442. return Response(content=json.dumps(res, indent=4), media_type="application/json")
  443. else:
  444. res = { 'type': 'danger', 'msg': 'command failed: ' + output.output.decode('utf-8') }
  445. return Response(content=json.dumps(res, indent=4), media_type="application/json")
  446. if type == 'utf8_text_only':
  447. return Response(content=output.output.decode('utf-8'), media_type="text/plain")