main.py 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261
  1. import os
  2. import sys
  3. import uvicorn
  4. import json
  5. import uuid
  6. import async_timeout
  7. import asyncio
  8. import aiodocker
  9. import docker
  10. import logging
  11. from logging.config import dictConfig
  12. from fastapi import FastAPI, Response, Request
  13. from modules.DockerApi import DockerApi
  14. from redis import asyncio as aioredis
  15. from contextlib import asynccontextmanager
  16. dockerapi = None
  17. @asynccontextmanager
  18. async def lifespan(app: FastAPI):
  19. global dockerapi
  20. # Initialize a custom logger
  21. logger = logging.getLogger("dockerapi")
  22. logger.setLevel(logging.INFO)
  23. # Configure the logger to output logs to the terminal
  24. handler = logging.StreamHandler()
  25. handler.setLevel(logging.INFO)
  26. formatter = logging.Formatter("%(levelname)s: %(message)s")
  27. handler.setFormatter(formatter)
  28. logger.addHandler(handler)
  29. logger.info("Init APP")
  30. # Init redis client
  31. if os.environ['REDIS_SLAVEOF_IP'] != "":
  32. redis_client = redis = await aioredis.from_url(f"redis://{os.environ['REDIS_SLAVEOF_IP']}:{os.environ['REDIS_SLAVEOF_PORT']}/0", password=os.environ['REDISPASS'])
  33. else:
  34. redis_client = redis = await aioredis.from_url("redis://redis-mailcow:6379/0", password=os.environ['REDISPASS'])
  35. # Init docker clients
  36. sync_docker_client = docker.DockerClient(base_url='unix://var/run/docker.sock', version='auto')
  37. async_docker_client = aiodocker.Docker(url='unix:///var/run/docker.sock')
  38. dockerapi = DockerApi(redis_client, sync_docker_client, async_docker_client, logger)
  39. logger.info("Subscribe to redis channel")
  40. # Subscribe to redis channel
  41. dockerapi.pubsub = redis.pubsub()
  42. await dockerapi.pubsub.subscribe("MC_CHANNEL")
  43. asyncio.create_task(handle_pubsub_messages(dockerapi.pubsub))
  44. yield
  45. # Close docker connections
  46. dockerapi.sync_docker_client.close()
  47. await dockerapi.async_docker_client.close()
  48. # Close redis
  49. await dockerapi.pubsub.unsubscribe("MC_CHANNEL")
  50. await dockerapi.redis_client.close()
  51. app = FastAPI(lifespan=lifespan)
  52. # Define Routes
  53. @app.get("/host/stats")
  54. async def get_host_update_stats():
  55. global dockerapi
  56. if dockerapi.host_stats_isUpdating == False:
  57. asyncio.create_task(dockerapi.get_host_stats())
  58. dockerapi.host_stats_isUpdating = True
  59. while True:
  60. if await dockerapi.redis_client.exists('host_stats'):
  61. break
  62. await asyncio.sleep(1.5)
  63. stats = json.loads(await dockerapi.redis_client.get('host_stats'))
  64. return Response(content=json.dumps(stats, indent=4), media_type="application/json")
  65. @app.get("/containers/{container_id}/json")
  66. async def get_container(container_id : str):
  67. global dockerapi
  68. if container_id and container_id.isalnum():
  69. try:
  70. for container in (await dockerapi.async_docker_client.containers.list()):
  71. if container._id == container_id:
  72. container_info = await container.show()
  73. return Response(content=json.dumps(container_info, indent=4), media_type="application/json")
  74. res = {
  75. "type": "danger",
  76. "msg": "no container found"
  77. }
  78. return Response(content=json.dumps(res, indent=4), media_type="application/json")
  79. except Exception as e:
  80. res = {
  81. "type": "danger",
  82. "msg": str(e)
  83. }
  84. return Response(content=json.dumps(res, indent=4), media_type="application/json")
  85. else:
  86. res = {
  87. "type": "danger",
  88. "msg": "no or invalid id defined"
  89. }
  90. return Response(content=json.dumps(res, indent=4), media_type="application/json")
  91. @app.get("/containers/json")
  92. async def get_containers():
  93. global dockerapi
  94. containers = {}
  95. try:
  96. for container in (await dockerapi.async_docker_client.containers.list()):
  97. container_info = await container.show()
  98. containers.update({container_info['Id']: container_info})
  99. return Response(content=json.dumps(containers, indent=4), media_type="application/json")
  100. except Exception as e:
  101. res = {
  102. "type": "danger",
  103. "msg": str(e)
  104. }
  105. return Response(content=json.dumps(res, indent=4), media_type="application/json")
  106. @app.post("/containers/{container_id}/{post_action}")
  107. async def post_containers(container_id : str, post_action : str, request: Request):
  108. global dockerapi
  109. try:
  110. request_json = await request.json()
  111. except Exception as err:
  112. request_json = {}
  113. if container_id and container_id.isalnum() and post_action:
  114. try:
  115. """Dispatch container_post api call"""
  116. if post_action == 'exec':
  117. if not request_json or not 'cmd' in request_json:
  118. res = {
  119. "type": "danger",
  120. "msg": "cmd is missing"
  121. }
  122. return Response(content=json.dumps(res, indent=4), media_type="application/json")
  123. if not request_json or not 'task' in request_json:
  124. res = {
  125. "type": "danger",
  126. "msg": "task is missing"
  127. }
  128. return Response(content=json.dumps(res, indent=4), media_type="application/json")
  129. api_call_method_name = '__'.join(['container_post', str(post_action), str(request_json['cmd']), str(request_json['task']) ])
  130. else:
  131. api_call_method_name = '__'.join(['container_post', str(post_action) ])
  132. api_call_method = getattr(dockerapi, api_call_method_name, lambda container_id: Response(content=json.dumps({'type': 'danger', 'msg':'container_post - unknown api call' }, indent=4), media_type="application/json"))
  133. dockerapi.logger.info("api call: %s, container_id: %s" % (api_call_method_name, container_id))
  134. return api_call_method(request_json, container_id=container_id)
  135. except Exception as e:
  136. dockerapi.logger.error("error - container_post: %s" % str(e))
  137. res = {
  138. "type": "danger",
  139. "msg": str(e)
  140. }
  141. return Response(content=json.dumps(res, indent=4), media_type="application/json")
  142. else:
  143. res = {
  144. "type": "danger",
  145. "msg": "invalid container id or missing action"
  146. }
  147. return Response(content=json.dumps(res, indent=4), media_type="application/json")
  148. @app.post("/container/{container_id}/stats/update")
  149. async def post_container_update_stats(container_id : str):
  150. global dockerapi
  151. # start update task for container if no task is running
  152. if container_id not in dockerapi.containerIds_to_update:
  153. asyncio.create_task(dockerapi.get_container_stats(container_id))
  154. dockerapi.containerIds_to_update.append(container_id)
  155. while True:
  156. if await dockerapi.redis_client.exists(container_id + '_stats'):
  157. break
  158. await asyncio.sleep(1.5)
  159. stats = json.loads(await dockerapi.redis_client.get(container_id + '_stats'))
  160. return Response(content=json.dumps(stats, indent=4), media_type="application/json")
  161. # PubSub Handler
  162. async def handle_pubsub_messages(channel: aioredis.client.PubSub):
  163. global dockerapi
  164. while True:
  165. try:
  166. async with async_timeout.timeout(60):
  167. message = await channel.get_message(ignore_subscribe_messages=True, timeout=30)
  168. if message is not None:
  169. # Parse message
  170. data_json = json.loads(message['data'].decode('utf-8'))
  171. dockerapi.logger.info(f"PubSub Received - {json.dumps(data_json)}")
  172. # Handle api_call
  173. if 'api_call' in data_json:
  174. # api_call: container_post
  175. if data_json['api_call'] == "container_post":
  176. if 'post_action' in data_json and 'container_name' in data_json:
  177. try:
  178. """Dispatch container_post api call"""
  179. request_json = {}
  180. if data_json['post_action'] == 'exec':
  181. if 'request' in data_json:
  182. request_json = data_json['request']
  183. if 'cmd' in request_json:
  184. if 'task' in request_json:
  185. api_call_method_name = '__'.join(['container_post', str(data_json['post_action']), str(request_json['cmd']), str(request_json['task']) ])
  186. else:
  187. dockerapi.logger.error("api call: task missing")
  188. else:
  189. dockerapi.logger.error("api call: cmd missing")
  190. else:
  191. dockerapi.logger.error("api call: request missing")
  192. else:
  193. api_call_method_name = '__'.join(['container_post', str(data_json['post_action'])])
  194. if api_call_method_name:
  195. api_call_method = getattr(dockerapi, api_call_method_name)
  196. if api_call_method:
  197. dockerapi.logger.info("api call: %s, container_name: %s" % (api_call_method_name, data_json['container_name']))
  198. api_call_method(request_json, container_name=data_json['container_name'])
  199. else:
  200. dockerapi.logger.error("api call not found: %s, container_name: %s" % (api_call_method_name, data_json['container_name']))
  201. except Exception as e:
  202. dockerapi.logger.error("container_post: %s" % str(e))
  203. else:
  204. dockerapi.logger.error("api call: missing container_name, post_action or request")
  205. else:
  206. dockerapi.logger.error("Unknown PubSub received - %s" % json.dumps(data_json))
  207. else:
  208. dockerapi.logger.error("Unknown PubSub received - %s" % json.dumps(data_json))
  209. await asyncio.sleep(0.0)
  210. except asyncio.TimeoutError:
  211. pass
  212. if __name__ == '__main__':
  213. uvicorn.run(
  214. app,
  215. host="0.0.0.0",
  216. port=443,
  217. ssl_certfile="/app/dockerapi_cert.pem",
  218. ssl_keyfile="/app/dockerapi_key.pem",
  219. log_level="info",
  220. loop="none"
  221. )