123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128 |
- import docker
- from docker.errors import APIError
- class Docker:
- def __init__(self):
- self.client = docker.from_env()
- def exec_command(self, container_name, cmd, user=None):
- """
- Execute a command in a container by its container name.
- :param container_name: The name of the container.
- :param cmd: The command to execute as a list (e.g., ["ls", "-la"]).
- :param user: The user to execute the command as (optional).
- :return: A standardized response with status, output, and exit_code.
- """
- filters = {"name": container_name}
- try:
- for container in self.client.containers.list(filters=filters):
- exec_result = container.exec_run(cmd, user=user)
- return {
- "status": "success",
- "exit_code": exec_result.exit_code,
- "output": exec_result.output.decode("utf-8")
- }
- except APIError as e:
- return {
- "status": "error",
- "exit_code": "APIError",
- "output": str(e)
- }
- except Exception as e:
- return {
- "status": "error",
- "exit_code": "Exception",
- "output": str(e)
- }
- def start_container(self, container_name):
- """
- Start a container by its container name.
- :param container_name: The name of the container.
- :return: A standardized response with status, output, and exit_code.
- """
- filters = {"name": container_name}
- try:
- for container in self.client.containers.list(filters=filters):
- container.start()
- return {
- "status": "success",
- "exit_code": "0",
- "output": f"Container '{container_name}' started successfully."
- }
- except APIError as e:
- return {
- "status": "error",
- "exit_code": "APIError",
- "output": str(e)
- }
- except Exception as e:
- return {
- "status": "error",
- "error_type": "Exception",
- "output": str(e)
- }
- def stop_container(self, container_name):
- """
- Stop a container by its container name.
- :param container_name: The name of the container.
- :return: A standardized response with status, output, and exit_code.
- """
- filters = {"name": container_name}
- try:
- for container in self.client.containers.list(filters=filters):
- container.stop()
- return {
- "status": "success",
- "exit_code": "0",
- "output": f"Container '{container_name}' stopped successfully."
- }
- except APIError as e:
- return {
- "status": "error",
- "exit_code": "APIError",
- "output": str(e)
- }
- except Exception as e:
- return {
- "status": "error",
- "exit_code": "Exception",
- "output": str(e)
- }
- def restart_container(self, container_name):
- """
- Restart a container by its container name.
- :param container_name: The name of the container.
- :return: A standardized response with status, output, and exit_code.
- """
- filters = {"name": container_name}
- try:
- for container in self.client.containers.list(filters=filters):
- container.restart()
- return {
- "status": "success",
- "exit_code": "0",
- "output": f"Container '{container_name}' restarted successfully."
- }
- except APIError as e:
- return {
- "status": "error",
- "exit_code": "APIError",
- "output": str(e)
- }
- except Exception as e:
- return {
- "status": "error",
- "exit_code": "Exception",
- "output": str(e)
- }
|