quarantine_notify.py 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174
  1. #!/usr/bin/python3
  2. import smtplib
  3. import os
  4. import sys
  5. import MySQLdb
  6. from email.mime.multipart import MIMEMultipart
  7. from email.mime.text import MIMEText
  8. from email.utils import COMMASPACE, formatdate
  9. import jinja2
  10. from jinja2 import TemplateError
  11. from jinja2.sandbox import SandboxedEnvironment
  12. import json
  13. import redis
  14. import time
  15. import html2text
  16. import socket
  17. pid = str(os.getpid())
  18. pidfile = "/tmp/quarantine_notify.pid"
  19. if os.path.isfile(pidfile):
  20. print("%s already exists, exiting" % (pidfile))
  21. sys.exit()
  22. pid = str(os.getpid())
  23. f = open(pidfile, 'w')
  24. f.write(pid)
  25. f.close()
  26. try:
  27. while True:
  28. try:
  29. r = redis.StrictRedis(host='redis', decode_responses=True, port=6379, db=0, password=os.environ['REDISPASS'])
  30. r.ping()
  31. except Exception as ex:
  32. print('%s - trying again...' % (ex))
  33. time.sleep(3)
  34. else:
  35. break
  36. time_now = int(time.time())
  37. mailcow_hostname = os.environ.get('MAILCOW_HOSTNAME')
  38. max_score = float(r.get('Q_MAX_SCORE') or "9999.0")
  39. if max_score == "":
  40. max_score = 9999.0
  41. def query_mysql(query, headers = True, update = False):
  42. while True:
  43. try:
  44. cnx = MySQLdb.connect(user=os.environ.get('DBUSER'), password=os.environ.get('DBPASS'), database=os.environ.get('DBNAME'), charset="utf8mb4", collation="utf8mb4_general_ci")
  45. except Exception as ex:
  46. print('%s - trying again...' % (ex))
  47. time.sleep(3)
  48. else:
  49. break
  50. cur = cnx.cursor()
  51. cur.execute(query)
  52. if not update:
  53. result = []
  54. columns = tuple( [d[0] for d in cur.description] )
  55. for row in cur:
  56. if headers:
  57. result.append(dict(list(zip(columns, row))))
  58. else:
  59. result.append(row)
  60. cur.close()
  61. cnx.close()
  62. return result
  63. else:
  64. cnx.commit()
  65. cur.close()
  66. cnx.close()
  67. def notify_rcpt(rcpt, msg_count, quarantine_acl, category):
  68. if category == "add_header": category = "add header"
  69. meta_query = query_mysql('SELECT SHA2(CONCAT(id, qid), 256) AS qhash, id, subject, score, sender, created, action FROM quarantine WHERE notified = 0 AND rcpt = "%s" AND score < %f AND (action = "%s" OR "all" = "%s")' % (rcpt, max_score, category, category))
  70. print("%s: %d of %d messages qualify for notification" % (rcpt, len(meta_query), msg_count))
  71. if len(meta_query) == 0:
  72. return
  73. msg_count = len(meta_query)
  74. env = SandboxedEnvironment()
  75. if r.get('Q_HTML'):
  76. try:
  77. template = env.from_string(r.get('Q_HTML'))
  78. except Exception:
  79. print("Error: Cannot parse quarantine template, falling back to default template.")
  80. with open('/templates/quarantine.tpl') as file_:
  81. template = env.from_string(file_.read())
  82. else:
  83. with open('/templates/quarantine.tpl') as file_:
  84. template = env.from_string(file_.read())
  85. try:
  86. html = template.render(meta=meta_query, username=rcpt, counter=msg_count, hostname=mailcow_hostname, quarantine_acl=quarantine_acl)
  87. except (jinja2.exceptions.SecurityError, TemplateError) as ex:
  88. print(f"SecurityError or TemplateError in template rendering: {ex}")
  89. return
  90. text = html2text.html2text(html)
  91. count = 0
  92. while count < 15:
  93. count += 1
  94. try:
  95. server = smtplib.SMTP('postfix', 590, 'quarantine')
  96. server.ehlo()
  97. msg = MIMEMultipart('alternative')
  98. msg_from = r.get('Q_SENDER') or "quarantine@localhost"
  99. # Remove non-ascii chars from field
  100. msg['From'] = ''.join([i if ord(i) < 128 else '' for i in msg_from])
  101. msg['Subject'] = r.get('Q_SUBJ') or "Spam Quarantine Notification"
  102. msg['Date'] = formatdate(localtime = True)
  103. text_part = MIMEText(text, 'plain', 'utf-8')
  104. html_part = MIMEText(html, 'html', 'utf-8')
  105. msg.attach(text_part)
  106. msg.attach(html_part)
  107. msg['To'] = str(rcpt)
  108. bcc = r.get('Q_BCC') or ""
  109. redirect = r.get('Q_REDIRECT') or ""
  110. text = msg.as_string()
  111. if bcc == '':
  112. if redirect == '':
  113. server.sendmail(msg['From'], str(rcpt), text)
  114. else:
  115. server.sendmail(msg['From'], str(redirect), text)
  116. else:
  117. if redirect == '':
  118. server.sendmail(msg['From'], [str(rcpt)] + [str(bcc)], text)
  119. else:
  120. server.sendmail(msg['From'], [str(redirect)] + [str(bcc)], text)
  121. server.quit()
  122. for res in meta_query:
  123. query_mysql('UPDATE quarantine SET notified = 1 WHERE id = "%d"' % (res['id']), update = True)
  124. r.hset('Q_LAST_NOTIFIED', record['rcpt'], time_now)
  125. break
  126. except Exception as ex:
  127. server.quit()
  128. print('%s' % (ex))
  129. time.sleep(3)
  130. records = query_mysql('SELECT IFNULL(user_acl.quarantine, 0) AS quarantine_acl, count(id) AS counter, rcpt FROM quarantine LEFT OUTER JOIN user_acl ON user_acl.username = rcpt WHERE notified = 0 AND score < %f AND rcpt in (SELECT username FROM mailbox) GROUP BY rcpt' % (max_score))
  131. for record in records:
  132. attrs = ''
  133. attrs_json = ''
  134. time_trans = {
  135. "hourly": 3600,
  136. "daily": 86400,
  137. "weekly": 604800
  138. }
  139. try:
  140. last_notification = int(r.hget('Q_LAST_NOTIFIED', record['rcpt']))
  141. if last_notification > time_now:
  142. print('Last notification is > time now, assuming never')
  143. last_notification = 0
  144. except Exception as ex:
  145. print('Could not determine last notification for %s, assuming never' % (record['rcpt']))
  146. last_notification = 0
  147. attrs_json = query_mysql('SELECT attributes FROM mailbox WHERE username = "%s"' % (record['rcpt']))
  148. attrs = attrs_json[0]['attributes']
  149. if isinstance(attrs, str):
  150. # if attr is str then just load it
  151. attrs = json.loads(attrs)
  152. else:
  153. # if it's bytes then decode and load it
  154. attrs = json.loads(attrs.decode('utf-8'))
  155. if attrs['quarantine_notification'] not in ('hourly', 'daily', 'weekly'):
  156. continue
  157. if last_notification == 0 or (last_notification + time_trans[attrs['quarantine_notification']]) <= time_now:
  158. print("Notifying %s: Considering %d new items in quarantine (policy: %s)" % (record['rcpt'], record['counter'], attrs['quarantine_notification']))
  159. notify_rcpt(record['rcpt'], record['counter'], record['quarantine_acl'], attrs['quarantine_category'])
  160. finally:
  161. os.unlink(pidfile)