123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206 |
- import os
- from modules.Docker import Docker
- class Dovecot:
- def __init__(self):
- self.docker = Docker()
- def decryptMaildir(self, source_dir="/var/vmail/", output_dir=None):
- """
- Decrypt files in /var/vmail using doveadm if they are encrypted.
- :param output_dir: Directory inside the Dovecot container to store decrypted files, Default overwrite.
- """
- private_key = "/mail_crypt/ecprivkey.pem"
- public_key = "/mail_crypt/ecpubkey.pem"
- if output_dir:
- # Ensure the output directory exists inside the container
- mkdir_result = self.docker.exec_command("dovecot-mailcow", f"bash -c 'mkdir -p {output_dir} && chown vmail:vmail {output_dir}'")
- if mkdir_result.get("status") != "success":
- print(f"Error creating output directory: {mkdir_result.get('output')}")
- return
- find_command = [
- "find", source_dir, "-type", "f", "-regextype", "egrep", "-regex", ".*S=.*W=.*"
- ]
- try:
- find_result = self.docker.exec_command("dovecot-mailcow", " ".join(find_command))
- if find_result.get("status") != "success":
- print(f"Error finding files: {find_result.get('output')}")
- return
- files = find_result.get("output", "").splitlines()
- for file in files:
- head_command = f"head -c7 {file}"
- head_result = self.docker.exec_command("dovecot-mailcow", head_command)
- if head_result.get("status") == "success" and head_result.get("output", "").strip() == "CRYPTED":
- if output_dir:
- # Preserve the directory structure in the output directory
- relative_path = os.path.relpath(file, source_dir)
- output_file = os.path.join(output_dir, relative_path)
- current_path = output_dir
- for part in os.path.dirname(relative_path).split(os.sep):
- current_path = os.path.join(current_path, part)
- mkdir_result = self.docker.exec_command("dovecot-mailcow", f"bash -c '[ ! -d {current_path} ] && mkdir {current_path} && chown vmail:vmail {current_path}'")
- if mkdir_result.get("status") != "success":
- print(f"Error creating directory {current_path}: {mkdir_result.get('output')}")
- continue
- else:
- # Overwrite the original file
- output_file = file
- decrypt_command = (
- f"bash -c 'doveadm fs get compress lz4:1:crypt:private_key_path={private_key}:public_key_path={public_key}:posix:prefix=/ {file} > {output_file}'"
- )
- decrypt_result = self.docker.exec_command("dovecot-mailcow", decrypt_command)
- if decrypt_result.get("status") == "success":
- print(f"Decrypted {file}")
- # Verify the file size and set permissions
- size_check_command = f"bash -c '[ -s {output_file} ] && chmod 600 {output_file} && chown vmail:vmail {output_file} || rm -f {output_file}'"
- size_check_result = self.docker.exec_command("dovecot-mailcow", size_check_command)
- if size_check_result.get("status") != "success":
- print(f"Error setting permissions for {output_file}: {size_check_result.get('output')}\n")
- except Exception as e:
- print(f"Error during decryption: {e}")
- return "Done"
- def encryptMaildir(self, source_dir="/var/vmail/", output_dir=None):
- """
- Encrypt files in /var/vmail using doveadm if they are not already encrypted.
- :param source_dir: Directory inside the Dovecot container to encrypt files.
- :param output_dir: Directory inside the Dovecot container to store encrypted files, Default overwrite.
- """
- private_key = "/mail_crypt/ecprivkey.pem"
- public_key = "/mail_crypt/ecpubkey.pem"
- if output_dir:
- # Ensure the output directory exists inside the container
- mkdir_result = self.docker.exec_command("dovecot-mailcow", f"mkdir -p {output_dir}")
- if mkdir_result.get("status") != "success":
- print(f"Error creating output directory: {mkdir_result.get('output')}")
- return
- find_command = [
- "find", source_dir, "-type", "f", "-regextype", "egrep", "-regex", ".*S=.*W=.*"
- ]
- try:
- find_result = self.docker.exec_command("dovecot-mailcow", " ".join(find_command))
- if find_result.get("status") != "success":
- print(f"Error finding files: {find_result.get('output')}")
- return
- files = find_result.get("output", "").splitlines()
- for file in files:
- head_command = f"head -c7 {file}"
- head_result = self.docker.exec_command("dovecot-mailcow", head_command)
- if head_result.get("status") == "success" and head_result.get("output", "").strip() != "CRYPTED":
- if output_dir:
- # Preserve the directory structure in the output directory
- relative_path = os.path.relpath(file, source_dir)
- output_file = os.path.join(output_dir, relative_path)
- current_path = output_dir
- for part in os.path.dirname(relative_path).split(os.sep):
- current_path = os.path.join(current_path, part)
- mkdir_result = self.docker.exec_command("dovecot-mailcow", f"bash -c '[ ! -d {current_path} ] && mkdir {current_path} && chown vmail:vmail {current_path}'")
- if mkdir_result.get("status") != "success":
- print(f"Error creating directory {current_path}: {mkdir_result.get('output')}")
- continue
- else:
- # Overwrite the original file
- output_file = file
- encrypt_command = (
- f"bash -c 'doveadm fs put crypt private_key_path={private_key}:public_key_path={public_key}:posix:prefix=/ {file} {output_file}'"
- )
- encrypt_result = self.docker.exec_command("dovecot-mailcow", encrypt_command)
- if encrypt_result.get("status") == "success":
- print(f"Encrypted {file}")
- # Set permissions
- permissions_command = f"bash -c 'chmod 600 {output_file} && chown 5000:5000 {output_file}'"
- permissions_result = self.docker.exec_command("dovecot-mailcow", permissions_command)
- if permissions_result.get("status") != "success":
- print(f"Error setting permissions for {output_file}: {permissions_result.get('output')}\n")
- except Exception as e:
- print(f"Error during encryption: {e}")
- return "Done"
- def listDeletedMaildirs(self, source_dir="/var/vmail/_garbage"):
- """
- List deleted maildirs in the specified garbage directory.
- :param source_dir: Directory to search for deleted maildirs.
- :return: List of maildirs.
- """
- list_command = ["bash", "-c", f"ls -la {source_dir}"]
- try:
- result = self.docker.exec_command("dovecot-mailcow", list_command)
- if result.get("status") != "success":
- print(f"Error listing deleted maildirs: {result.get('output')}")
- return []
- lines = result.get("output", "").splitlines()
- maildirs = {}
- for idx, line in enumerate(lines):
- parts = line.split()
- if "_" in line:
- folder_name = parts[-1]
- time, maildir = folder_name.split("_", 1)
- if maildir.endswith("_index"):
- main_item = maildir[:-6]
- if main_item in maildirs:
- maildirs[main_item]["has_index"] = True
- else:
- maildirs[maildir] = {"item": idx, "time": time, "name": maildir, "has_index": False}
- return list(maildirs.values())
- except Exception as e:
- print(f"Error during listing deleted maildirs: {e}")
- return []
- def restoreMaildir(self, username, item, source_dir="/var/vmail/_garbage"):
- """
- Restore a maildir item for a specific user from the deleted maildirs.
- :param username: Username to restore the item to.
- :param item: Item to restore (e.g., mailbox, folder).
- :param source_dir: Directory containing deleted maildirs.
- :return: Response from Dovecot.
- """
- username_splitted = username.split("@")
- maildirs = self.listDeletedMaildirs()
- maildir = None
- for mdir in maildirs:
- if mdir["item"] == int(item):
- maildir = mdir
- break
- if not maildir:
- return {"status": "error", "message": "Maildir not found."}
- restore_command = f"mv {source_dir}/{maildir['time']}_{maildir['name']} /var/vmail/{username_splitted[1]}/{username_splitted[0]}"
- restore_index_command = f"mv {source_dir}/{maildir['time']}_{maildir['name']}_index /var/vmail_index/{username}"
- result = self.docker.exec_command("dovecot-mailcow", ["bash", "-c", restore_command])
- if result.get("status") != "success":
- return {"status": "error", "message": "Failed to restore maildir."}
- result = self.docker.exec_command("dovecot-mailcow", ["bash", "-c", restore_index_command])
- if result.get("status") != "success":
- return {"status": "error", "message": "Failed to restore maildir index."}
- return "Done"
|