Dovecot.py 10.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206
  1. import os
  2. from modules.Docker import Docker
  3. class Dovecot:
  4. def __init__(self):
  5. self.docker = Docker()
  6. def decryptMaildir(self, source_dir="/var/vmail/", output_dir=None):
  7. """
  8. Decrypt files in /var/vmail using doveadm if they are encrypted.
  9. :param output_dir: Directory inside the Dovecot container to store decrypted files, Default overwrite.
  10. """
  11. private_key = "/mail_crypt/ecprivkey.pem"
  12. public_key = "/mail_crypt/ecpubkey.pem"
  13. if output_dir:
  14. # Ensure the output directory exists inside the container
  15. mkdir_result = self.docker.exec_command("dovecot-mailcow", f"bash -c 'mkdir -p {output_dir} && chown vmail:vmail {output_dir}'")
  16. if mkdir_result.get("status") != "success":
  17. print(f"Error creating output directory: {mkdir_result.get('output')}")
  18. return
  19. find_command = [
  20. "find", source_dir, "-type", "f", "-regextype", "egrep", "-regex", ".*S=.*W=.*"
  21. ]
  22. try:
  23. find_result = self.docker.exec_command("dovecot-mailcow", " ".join(find_command))
  24. if find_result.get("status") != "success":
  25. print(f"Error finding files: {find_result.get('output')}")
  26. return
  27. files = find_result.get("output", "").splitlines()
  28. for file in files:
  29. head_command = f"head -c7 {file}"
  30. head_result = self.docker.exec_command("dovecot-mailcow", head_command)
  31. if head_result.get("status") == "success" and head_result.get("output", "").strip() == "CRYPTED":
  32. if output_dir:
  33. # Preserve the directory structure in the output directory
  34. relative_path = os.path.relpath(file, source_dir)
  35. output_file = os.path.join(output_dir, relative_path)
  36. current_path = output_dir
  37. for part in os.path.dirname(relative_path).split(os.sep):
  38. current_path = os.path.join(current_path, part)
  39. mkdir_result = self.docker.exec_command("dovecot-mailcow", f"bash -c '[ ! -d {current_path} ] && mkdir {current_path} && chown vmail:vmail {current_path}'")
  40. if mkdir_result.get("status") != "success":
  41. print(f"Error creating directory {current_path}: {mkdir_result.get('output')}")
  42. continue
  43. else:
  44. # Overwrite the original file
  45. output_file = file
  46. decrypt_command = (
  47. f"bash -c 'doveadm fs get compress lz4:1:crypt:private_key_path={private_key}:public_key_path={public_key}:posix:prefix=/ {file} > {output_file}'"
  48. )
  49. decrypt_result = self.docker.exec_command("dovecot-mailcow", decrypt_command)
  50. if decrypt_result.get("status") == "success":
  51. print(f"Decrypted {file}")
  52. # Verify the file size and set permissions
  53. size_check_command = f"bash -c '[ -s {output_file} ] && chmod 600 {output_file} && chown vmail:vmail {output_file} || rm -f {output_file}'"
  54. size_check_result = self.docker.exec_command("dovecot-mailcow", size_check_command)
  55. if size_check_result.get("status") != "success":
  56. print(f"Error setting permissions for {output_file}: {size_check_result.get('output')}\n")
  57. except Exception as e:
  58. print(f"Error during decryption: {e}")
  59. return "Done"
  60. def encryptMaildir(self, source_dir="/var/vmail/", output_dir=None):
  61. """
  62. Encrypt files in /var/vmail using doveadm if they are not already encrypted.
  63. :param source_dir: Directory inside the Dovecot container to encrypt files.
  64. :param output_dir: Directory inside the Dovecot container to store encrypted files, Default overwrite.
  65. """
  66. private_key = "/mail_crypt/ecprivkey.pem"
  67. public_key = "/mail_crypt/ecpubkey.pem"
  68. if output_dir:
  69. # Ensure the output directory exists inside the container
  70. mkdir_result = self.docker.exec_command("dovecot-mailcow", f"mkdir -p {output_dir}")
  71. if mkdir_result.get("status") != "success":
  72. print(f"Error creating output directory: {mkdir_result.get('output')}")
  73. return
  74. find_command = [
  75. "find", source_dir, "-type", "f", "-regextype", "egrep", "-regex", ".*S=.*W=.*"
  76. ]
  77. try:
  78. find_result = self.docker.exec_command("dovecot-mailcow", " ".join(find_command))
  79. if find_result.get("status") != "success":
  80. print(f"Error finding files: {find_result.get('output')}")
  81. return
  82. files = find_result.get("output", "").splitlines()
  83. for file in files:
  84. head_command = f"head -c7 {file}"
  85. head_result = self.docker.exec_command("dovecot-mailcow", head_command)
  86. if head_result.get("status") == "success" and head_result.get("output", "").strip() != "CRYPTED":
  87. if output_dir:
  88. # Preserve the directory structure in the output directory
  89. relative_path = os.path.relpath(file, source_dir)
  90. output_file = os.path.join(output_dir, relative_path)
  91. current_path = output_dir
  92. for part in os.path.dirname(relative_path).split(os.sep):
  93. current_path = os.path.join(current_path, part)
  94. mkdir_result = self.docker.exec_command("dovecot-mailcow", f"bash -c '[ ! -d {current_path} ] && mkdir {current_path} && chown vmail:vmail {current_path}'")
  95. if mkdir_result.get("status") != "success":
  96. print(f"Error creating directory {current_path}: {mkdir_result.get('output')}")
  97. continue
  98. else:
  99. # Overwrite the original file
  100. output_file = file
  101. encrypt_command = (
  102. f"bash -c 'doveadm fs put crypt private_key_path={private_key}:public_key_path={public_key}:posix:prefix=/ {file} {output_file}'"
  103. )
  104. encrypt_result = self.docker.exec_command("dovecot-mailcow", encrypt_command)
  105. if encrypt_result.get("status") == "success":
  106. print(f"Encrypted {file}")
  107. # Set permissions
  108. permissions_command = f"bash -c 'chmod 600 {output_file} && chown 5000:5000 {output_file}'"
  109. permissions_result = self.docker.exec_command("dovecot-mailcow", permissions_command)
  110. if permissions_result.get("status") != "success":
  111. print(f"Error setting permissions for {output_file}: {permissions_result.get('output')}\n")
  112. except Exception as e:
  113. print(f"Error during encryption: {e}")
  114. return "Done"
  115. def listDeletedMaildirs(self, source_dir="/var/vmail/_garbage"):
  116. """
  117. List deleted maildirs in the specified garbage directory.
  118. :param source_dir: Directory to search for deleted maildirs.
  119. :return: List of maildirs.
  120. """
  121. list_command = ["bash", "-c", f"ls -la {source_dir}"]
  122. try:
  123. result = self.docker.exec_command("dovecot-mailcow", list_command)
  124. if result.get("status") != "success":
  125. print(f"Error listing deleted maildirs: {result.get('output')}")
  126. return []
  127. lines = result.get("output", "").splitlines()
  128. maildirs = {}
  129. for idx, line in enumerate(lines):
  130. parts = line.split()
  131. if "_" in line:
  132. folder_name = parts[-1]
  133. time, maildir = folder_name.split("_", 1)
  134. if maildir.endswith("_index"):
  135. main_item = maildir[:-6]
  136. if main_item in maildirs:
  137. maildirs[main_item]["has_index"] = True
  138. else:
  139. maildirs[maildir] = {"item": idx, "time": time, "name": maildir, "has_index": False}
  140. return list(maildirs.values())
  141. except Exception as e:
  142. print(f"Error during listing deleted maildirs: {e}")
  143. return []
  144. def restoreMaildir(self, username, item, source_dir="/var/vmail/_garbage"):
  145. """
  146. Restore a maildir item for a specific user from the deleted maildirs.
  147. :param username: Username to restore the item to.
  148. :param item: Item to restore (e.g., mailbox, folder).
  149. :param source_dir: Directory containing deleted maildirs.
  150. :return: Response from Dovecot.
  151. """
  152. username_splitted = username.split("@")
  153. maildirs = self.listDeletedMaildirs()
  154. maildir = None
  155. for mdir in maildirs:
  156. if mdir["item"] == int(item):
  157. maildir = mdir
  158. break
  159. if not maildir:
  160. return {"status": "error", "message": "Maildir not found."}
  161. restore_command = f"mv {source_dir}/{maildir['time']}_{maildir['name']} /var/vmail/{username_splitted[1]}/{username_splitted[0]}"
  162. restore_index_command = f"mv {source_dir}/{maildir['time']}_{maildir['name']}_index /var/vmail_index/{username}"
  163. result = self.docker.exec_command("dovecot-mailcow", ["bash", "-c", restore_command])
  164. if result.get("status") != "success":
  165. return {"status": "error", "message": "Failed to restore maildir."}
  166. result = self.docker.exec_command("dovecot-mailcow", ["bash", "-c", restore_index_command])
  167. if result.get("status") != "success":
  168. return {"status": "error", "message": "Failed to restore maildir index."}
  169. return "Done"