瀏覽代碼

[Rspamd] Dynamic ratelimit fixed, removed async redis request; Ready to implement per-user ratelimits via UI (tbd)

andryyy 8 年之前
父節點
當前提交
b1213c51d7

+ 3 - 2
data/Dockerfiles/rspamd/Dockerfile

@@ -8,8 +8,8 @@ RUN apt-get update && apt-get install -y \
 	ca-certificates \
 	gnupg2 \
 	apt-transport-https \
-	&& apt-key adv --fetch-keys https://rspamd.com/apt-stable/gpg.key \
-	&& echo "deb https://rspamd.com/apt-stable/ stretch main" > /etc/apt/sources.list.d/rspamd.list \
+	&& apt-key adv --fetch-keys https://rspamd.com/apt/gpg.key \
+	&& echo "deb https://rspamd.com/apt/ stretch main" > /etc/apt/sources.list.d/rspamd.list \
 	&& apt-get update && apt-get install -y rspamd \
 	&& rm -rf /var/lib/apt/lists/* \
 	&& echo '.include $LOCAL_CONFDIR/local.d/rspamd.conf.local' > /etc/rspamd/rspamd.conf.local \
@@ -20,6 +20,7 @@ RUN apt-get update && apt-get install -y \
 
 COPY settings.conf /etc/rspamd/modules.d/settings.conf
 COPY ratelimit.lua /usr/share/rspamd/lua/ratelimit.lua
+COPY lua_util.lua /usr/share/rspamd/lib/lua_util.lua
 COPY docker-entrypoint.sh /docker-entrypoint.sh
 
 ENTRYPOINT ["/docker-entrypoint.sh"]

+ 152 - 0
data/Dockerfiles/rspamd/lua_util.lua

@@ -0,0 +1,152 @@
+local exports = {}
+local lpeg = require 'lpeg'
+
+local split_grammar = {}
+local function rspamd_str_split(s, sep)
+  local gr = split_grammar[sep]
+
+  if not gr then
+    local _sep = lpeg.P(sep)
+    local elem = lpeg.C((1 - _sep)^0)
+    local p = lpeg.Ct(elem * (_sep * elem)^0)
+    gr = p
+    split_grammar[sep] = gr
+  end
+
+  return gr:match(s)
+end
+
+exports.rspamd_str_split = rspamd_str_split
+
+local space = lpeg.S' \t\n\v\f\r'
+local nospace = 1 - space
+local ptrim = space^0 * lpeg.C((space^0 * nospace^1)^0)
+local match = lpeg.match
+exports.rspamd_str_trim = function(s)
+  return match(ptrim, s)
+end
+
+-- Robert Jay Gould http://lua-users.org/wiki/SimpleRound
+exports.round = function(num, numDecimalPlaces)
+  local mult = 10^(numDecimalPlaces or 0)
+  return math.floor(num * mult) / mult
+end
+
+exports.template = function(tmpl, keys)
+  local var_lit = lpeg.P { lpeg.R("az") + lpeg.R("AZ") + lpeg.R("09") + "_" }
+  local var = lpeg.P { (lpeg.P("$") / "") * ((var_lit^1) / keys) }
+  local var_braced = lpeg.P { (lpeg.P("${") / "") * ((var_lit^1) / keys) * (lpeg.P("}") / "") }
+
+  local template_grammar = lpeg.Cs((var + var_braced + 1)^0)
+
+  return lpeg.match(template_grammar, tmpl)
+end
+
+exports.remove_email_aliases = function(email_addr)
+  local function check_gmail_user(addr)
+    -- Remove all points
+    local no_dots_user = string.gsub(addr.user, '%.', '')
+    local cap, pluses = string.match(no_dots_user, '^([^%+][^%+]*)(%+.*)$')
+    if cap then
+      return cap, rspamd_str_split(pluses, '+'), nil
+    elseif no_dots_user ~= addr.user then
+      return no_dots_user,{},nil
+    end
+
+    return nil
+  end
+
+  local function check_address(addr)
+    if addr.user then
+      local cap, pluses = string.match(addr.user, '^([^%+][^%+]*)(%+.*)$')
+      if cap then
+        return cap, rspamd_str_split(pluses, '+'), nil
+      end
+    end
+
+    return nil
+  end
+
+  local function set_addr(addr, new_user, new_domain)
+    if new_user then
+      addr.user = new_user
+    end
+    if new_domain then
+      addr.domain = new_domain
+    end
+
+    if addr.domain then
+      addr.addr = string.format('%s@%s', addr.user, addr.domain)
+    else
+      addr.addr = string.format('%s@', addr.user)
+    end
+
+    if addr.name and #addr.name > 0 then
+      addr.raw = string.format('"%s" <%s>', addr.name, addr.addr)
+    else
+      addr.raw = string.format('<%s>', addr.addr)
+    end
+  end
+
+  local function check_gmail(addr)
+    local nu, tags, nd = check_gmail_user(addr)
+
+    if nu then
+      return nu, tags, nd
+    end
+
+    return nil
+  end
+
+  local function check_googlemail(addr)
+    local nd = 'gmail.com'
+    local nu, tags = check_gmail_user(addr)
+
+    if nu then
+      return nu, tags, nd
+    end
+
+    return nil, nil, nd
+  end
+
+  local specific_domains = {
+    ['gmail.com'] = check_gmail,
+    ['googlemail.com'] = check_googlemail,
+  }
+
+  if email_addr then
+    if email_addr.domain and specific_domains[email_addr.domain] then
+      local nu, tags, nd = specific_domains[email_addr.domain](email_addr)
+      if nu or nd then
+        set_addr(email_addr, nu, nd)
+
+        return nu, tags
+      end
+    else
+      local nu, tags, nd = check_address(email_addr)
+      if nu or nd then
+        set_addr(email_addr, nu, nd)
+
+        return nu, tags
+      end
+    end
+
+    return nil
+  end
+end
+
+exports.is_rspamc_or_controller = function(task)
+  local ua = task:get_request_header('User-Agent') or ''
+  local pwd = task:get_request_header('Password')
+  local is_rspamc = false
+  if tostring(ua) == 'rspamc' or pwd then is_rspamc = true end
+
+  return is_rspamc
+end
+
+local unpack_function = table.unpack or unpack
+exports.unpack = function(t)
+  return unpack_function(t)
+end
+
+return exports

+ 328 - 322
data/Dockerfiles/rspamd/ratelimit.lua

@@ -1,5 +1,6 @@
 --[[
-Copyright (c) 2011-2015, Vsevolod Stakhov <vsevolod@highsecure.ru>
+Copyright (c) 2011-2017, Vsevolod Stakhov <vsevolod@highsecure.ru>
+Copyright (c) 2016-2017, Andrew Lewis <nerf@judo.za.org>
 
 Licensed under the Apache License, Version 2.0 (the "License");
 you may not use this file except in compliance with the License.
@@ -18,13 +19,10 @@ if confighelp then
   return
 end
 
--- A plugin that implements ratelimits using redis or kvstorage server
+-- A plugin that implements ratelimits using redis
 
-local E = {}
-
--- Default settings for limits, 1-st member is burst, second is rate and the third is numeric type
-local settings = {
-}
+local E, settings = {}, {}
+local N = 'ratelimit'
 -- Senders that are considered as bounce
 local bounce_senders = {'postmaster', 'mailer-daemon', '', 'null', 'fetchmail-daemon', 'mdaemon'}
 -- Do not check ratelimits for these recipients
@@ -35,12 +33,12 @@ local max_rcpt = 5
 local redis_params
 local ratelimit_symbol
 -- Do not delay mail after 1 day
-local max_delay = 24 * 3600
 local use_ip_score = false
-local rl_prefix = 'rl'
+local rl_prefix = 'RL'
 local ip_score_lower_bound = 10
 local ip_score_ham_multiplier = 1.1
 local ip_score_spam_divisor = 1.1
+local limits_hash
 
 local message_func = function(_, limit_type)
   return string.format('Ratelimit "%s" exceeded', limit_type)
@@ -49,12 +47,143 @@ end
 local rspamd_logger = require "rspamd_logger"
 local rspamd_util = require "rspamd_util"
 local rspamd_lua_utils = require "lua_util"
+local lua_redis = require "lua_redis"
 local fun = require "fun"
 
 local user_keywords = {'user'}
 
+local redis_script_sha
+local redis_script = [[local bucket
+local limited = false
+local buckets = {}
+local queue_id = table.remove(ARGV)
+local now = table.remove(ARGV)
+
+local argi = 0
+for i = 1, #KEYS do
+  local key = KEYS[i]
+  local period = tonumber(ARGV[argi+1])
+  local limit = tonumber(ARGV[argi+2])
+  if not buckets[key] then
+    buckets[key] = {
+      max_period = period,
+      limits = { {period, limit} },
+    }
+  else
+    table.insert(buckets[key].limits, {period, limit})
+    if period > buckets[key].max_period then
+      buckets[key].max_period = period
+    end
+  end
+  argi = argi + 2
+end
+
+for k, v in pairs(buckets) do
+  local maxp = v.max_period
+  redis.call('ZREMRANGEBYSCORE', k, '-inf', now - maxp)
+  for _, lim in ipairs(v.limits) do
+    local period = lim[1]
+    local limit = lim[2]
+    local rate
+    if period == maxp then
+      rate = redis.call('ZCARD', k)
+    else
+      rate = redis.call('ZCOUNT', k, now - period, '+inf')
+    end
+    if rate and rate >= limit then
+      limited = true
+      bucket = k
+    end
+  end
+  redis.call('EXPIRE', k, maxp)
+  if limited then break end
+end
+
+if not limited then
+  for k in pairs(buckets) do
+    redis.call('ZADD', k, now, queue_id)
+  end
+end
+
+return {limited, bucket}]]
+
+local redis_script_symbol = [[local limited = false
+local buckets, results = {}, {}
+local queue_id = table.remove(ARGV)
+local now = table.remove(ARGV)
+
+local argi = 0
+for i = 1, #KEYS do
+  local key = KEYS[i]
+  local period = tonumber(ARGV[argi+1])
+  local limit = tonumber(ARGV[argi+2])
+  if not buckets[key] then
+    buckets[key] = {
+      max_period = period,
+      limits = { {period, limit} },
+    }
+  else
+    table.insert(buckets[key].limits, {period, limit})
+    if period > buckets[key].max_period then
+      buckets[key].max_period = period
+    end
+  end
+  argi = argi + 2
+end
+
+for k, v in pairs(buckets) do
+  local maxp = v.max_period
+  redis.call('ZREMRANGEBYSCORE', k, '-inf', now - maxp)
+  for _, lim in ipairs(v.limits) do
+    local period = lim[1]
+    local limit = lim[2]
+    local rate
+    if period == maxp then
+      rate = redis.call('ZCARD', k)
+    else
+      rate = redis.call('ZCOUNT', k, now - period, '+inf')
+    end
+    if rate then
+      local mult = 2 * math.tanh(rate / (limit * 2))
+      if mult >= 0.5 then
+        table.insert(results, {k, tostring(mult)})
+      end
+    end
+  end
+  redis.call('ZADD', k, now, queue_id)
+  redis.call('EXPIRE', k, maxp)
+end
+
+return results]]
+
+local function load_scripts(cfg, ev_base)
+  local function rl_script_cb(err, data)
+    if err then
+      rspamd_logger.errx(cfg, 'Script loading failed: ' .. err)
+    elseif type(data) == 'string' then
+      redis_script_sha = data
+    end
+  end
+  local script
+  if ratelimit_symbol then
+    script = redis_script_symbol
+  else
+    script = redis_script
+  end
+  lua_redis.redis_make_request_taskless(
+    ev_base,
+    cfg,
+    redis_params,
+    nil, -- key
+    true, -- is write
+    rl_script_cb, --callback
+    'SCRIPT', -- command
+    {'LOAD', script}
+  )
+end
+
 local limit_parser
-local function parse_string_limit(lim)
+local function parse_string_limit(lim, no_error)
   local function parse_time_suffix(s)
     if s == 's' then
       return 1
@@ -107,44 +236,14 @@ local function parse_string_limit(lim)
   local t = lpeg.match(limit_parser.limit, lim)
 
   if t and t[1] and t[2] and t[2] ~= 0 then
-    return t[1] / t[2], t[1]
+    return t[2], t[1]
   end
 
-  rspamd_logger.errx(rspamd_config, 'bad limit: %s', lim)
-
-  return nil
-end
-
---- Parse atime and bucket of limit
-local function parse_limits(data)
-  local function parse_limit_elt(str)
-    local elts = rspamd_str_split(str, ':')
-    if not elts or #elts < 2 then
-      return {0, 0, 0}
-    else
-      local atime = tonumber(elts[1])
-      local bucket = tonumber(elts[2])
-      local ctime = atime
-
-      if elts[3] then
-        ctime = tonumber(elts[3])
-      end
-
-      if not ctime then
-        ctime = atime
-      end
-
-      return {atime,bucket,ctime}
-    end
+  if not no_error then
+    rspamd_logger.errx(rspamd_config, 'bad limit: %s', lim)
   end
 
-  return fun.iter(data):map(function(e)
-    if type(e) == 'string' then
-      return parse_limit_elt(e)
-    else
-      return {0, 0, 0}
-    end
-  end):totable()
+  return nil
 end
 
 local function resize_element(x_score, x_total, element)
@@ -191,7 +290,7 @@ local keywords = {
     ['get_value'] = function(task)
       local from = task:get_from(0)
       if ((from or E)[1] or E).addr then
-        return from[1]['addr']
+        return string.lower(from[1]['addr'])
       end
       return nil
     end,
@@ -270,7 +369,7 @@ local function dynamic_rate_key(task, rtype)
     local total_rcpt = 0
     for _, r in ipairs(rcpts) do
       if r['addr'] and total_rcpt < max_rcpt then
-        local key_f = string.format(key_s, r['addr'])
+        local key_f = string.format(key_s, string.lower(r['addr']))
         table.insert(rate_keys, key_f)
         total_rcpt = total_rcpt + 1
       end
@@ -279,194 +378,86 @@ local function dynamic_rate_key(task, rtype)
   end
 end
 
---- Check specific limit inside redis
-local function check_limits(task, args)
-
-  local key = fun.foldl(function(acc, k) return acc .. k[2] end, '', args)
-  local ret
-  --- Called when value is got from server
-  local function rate_get_cb(err, data)
+local function process_buckets(task, buckets)
+  if not buckets then return end
+  local function rl_redis_cb(err, data)
     if err then
-      rspamd_logger.infox(task, 'got error while getting limit: %1', err)
+      rspamd_logger.infox(task, 'got error while setting limit: %1', err)
     end
     if not data then return end
-    local ntime = rspamd_util.get_time()
-    local asn_score,total_asn,
-      country_score,total_country,
-      ipnet_score,total_ipnet,
-      ip_score, total_ip
-    if use_ip_score then
-      asn_score,total_asn,
-        country_score,total_country,
-        ipnet_score,total_ipnet,
-        ip_score, total_ip = task:get_mempool():get_variable('ip_score',
-        'double,double,double,double,double,double,double,double')
+    if data[1] == 1 then
+      rspamd_logger.infox(task,
+        'ratelimit "%s" exceeded',
+        data[2])
+      task:set_pre_result('soft reject',
+        message_func(task, data[2]))
     end
-
-    fun.each(function(elt, limit, rtype)
-      local bucket = elt[2]
-      local rate = limit[2]
-      local threshold = limit[1]
-      local atime = elt[1]
-      local ctime = elt[3]
-
-      if atime == 0 then return end
-
-      if use_ip_score then
-        local key_keywords = rspamd_str_split(rtype, '_')
-        local has_asn, has_ip = false, false
-        for _, v in ipairs(key_keywords) do
-          if v == "asn" then has_asn = true end
-          if v == "ip" then has_ip = true end
-          if has_ip and has_asn then break end
-        end
-        if has_asn and not has_ip then
-          bucket = resize_element(asn_score, total_asn, bucket)
-          rate = resize_element(asn_score, total_asn, rate)
-        elseif has_ip then
-          if total_ip and total_ip > ip_score_lower_bound then
-            bucket = resize_element(ip_score, total_ip, bucket)
-            rate = resize_element(ip_score, total_ip, rate)
-          elseif total_ipnet and total_ipnet > ip_score_lower_bound then
-            bucket = resize_element(ipnet_score, total_ipnet, bucket)
-            rate = resize_element(ipnet_score, total_ipnet, rate)
-          elseif total_asn and total_asn > ip_score_lower_bound then
-            bucket = resize_element(asn_score, total_asn, bucket)
-            rate = resize_element(asn_score, total_asn, rate)
-          elseif total_country and total_country > ip_score_lower_bound then
-            bucket = resize_element(country_score, total_country, bucket)
-            rate = resize_element(country_score, total_country, rate)
-          else
-            bucket = resize_element(ip_score, total_ip, bucket)
-            rate = resize_element(ip_score, total_ip, rate)
-          end
-        end
-      end
-
-      if atime - ctime > max_delay then
-        rspamd_logger.infox(task, 'limit is too old: %1 seconds; ignore it',
-          atime - ctime)
-      else
-        bucket = bucket - rate * (ntime - atime);
-        if bucket > 0 then
-          if ratelimit_symbol then
-            local mult = 2 * rspamd_util.tanh(bucket / (threshold * 2))
-
-            if mult > 0.5 then
-              task:insert_result(ratelimit_symbol, mult,
-                rtype .. ':' .. string.format('%.2f', mult))
-            end
-          else
-            if bucket > threshold then
-              rspamd_logger.infox(task,
-                'ratelimit "%s" exceeded: %s elements with %s limit',
-                rtype, bucket, threshold)
-              task:set_pre_result('soft reject',
-                message_func(task, rtype, bucket, threshold))
-            end
-          end
-        end
-      end
-    end, fun.zip(parse_limits(data), fun.map(function(a) return a[1] end, args),
-      fun.map(function(a) return rspamd_str_split(a[2], ":")[2] end, args)))
   end
-
-  ret = rspamd_redis_make_request(task,
-    redis_params, -- connect params
-    key, -- hash key
-    false, -- is write
-    rate_get_cb, --callback
-    'mget', -- command
-    fun.totable(fun.map(function(l) return l[2] end, args)) -- arguments
-  )
-  if not ret then
-    rspamd_logger.errx(task, 'got error connecting to redis')
-  end
-end
-
---- Set specific limit inside redis
-local function set_limits(task, args)
-  local key = fun.foldl(function(acc, k) return acc .. k[2] end, '', args)
-  local ret, upstream
-
-  local function rate_set_cb(err)
-    if err then
-      rspamd_logger.infox(task, 'got error %s when setting ratelimit record on server %s',
-        err, upstream:get_addr())
-    end
-  end
-  local function rate_get_cb(err, data)
+  local function rl_symbol_redis_cb(err, data)
     if err then
       rspamd_logger.infox(task, 'got error while setting limit: %1', err)
     end
     if not data then return end
-    local ntime = rspamd_util.get_time()
-    local values = {}
-    fun.each(function(elt, limit)
-      local bucket = elt[2]
-      local rate = limit[1][2]
-      local atime = elt[1]
-      local ctime = elt[3]
-
-      if atime - ctime > max_delay then
-        rspamd_logger.infox(task, 'limit is too old: %1 seconds; start it over',
-          atime - ctime)
-        bucket = 1
-        ctime = ntime
-      else
-        if bucket > 0 then
-          bucket = bucket - rate * (ntime - atime) + 1;
-          if bucket < 0 then
-            bucket = 1
-          end
+    for i, b in ipairs(data) do
+      task:insert_result(ratelimit_symbol, b[2], string.format('%s:%s:%s', i, b[1], b[2]))
+    end
+  end
+  local redis_cb = rl_redis_cb
+  if ratelimit_symbol then redis_cb = rl_symbol_redis_cb end
+  local args = {redis_script_sha, #buckets}
+  for _, bucket in ipairs(buckets) do
+    table.insert(args, bucket[2])
+  end
+  for _, bucket in ipairs(buckets) do
+    if use_ip_score then
+      local asn_score,total_asn,
+        country_score,total_country,
+        ipnet_score,total_ipnet,
+        ip_score, total_ip = task:get_mempool():get_variable('ip_score',
+        'double,double,double,double,double,double,double,double')
+      local key_keywords = rspamd_str_split(bucket[2], '_')
+      local has_asn, has_ip = false, false
+      for _, v in ipairs(key_keywords) do
+        if v == "asn" then has_asn = true end
+        if v == "ip" then has_ip = true end
+        if has_ip and has_asn then break end
+      end
+      if has_asn and not has_ip then
+        bucket[1][2] = resize_element(asn_score, total_asn, bucket[1][2])
+      elseif has_ip then
+        if total_ip and total_ip > ip_score_lower_bound then
+          bucket[1][2] = resize_element(ip_score, total_ip, bucket[1][2])
+        elseif total_ipnet and total_ipnet > ip_score_lower_bound then
+          bucket[1][2] = resize_element(ipnet_score, total_ipnet, bucket[1][2])
+        elseif total_asn and total_asn > ip_score_lower_bound then
+          bucket[1][2] = resize_element(asn_score, total_asn, bucket[1][2])
+        elseif total_country and total_country > ip_score_lower_bound then
+          bucket[1][2] = resize_element(country_score, total_country, bucket[1][2])
         else
-          bucket = 1
+          bucket[1][2] = resize_element(ip_score, total_ip, bucket[1][2])
         end
       end
-
-      if ctime == 0 then ctime = ntime end
-
-      local lstr = string.format('%.3f:%.3f:%.3f', ntime, bucket, ctime)
-      table.insert(values, {limit[2], max_delay, lstr})
-    end, fun.zip(parse_limits(data), fun.iter(args)))
-
-    if #values > 0 then
-      local conn
-      ret,conn,upstream = rspamd_redis_make_request(task,
-        redis_params, -- connect params
-        key, -- hash key
-        true, -- is write
-        rate_set_cb, --callback
-        'setex', -- command
-        values[1] -- arguments
-      )
-
-      if conn then
-        fun.each(function(v)
-          conn:add_cmd('setex', v)
-        end, fun.drop_n(1, values))
-      else
-        rspamd_logger.errx(task, 'got error while connecting to redis')
-      end
     end
+    table.insert(args, bucket[1][1])
+    table.insert(args, bucket[1][2])
   end
-
-  local _
-  ret,_,upstream = rspamd_redis_make_request(task,
+  table.insert(args, rspamd_util.get_time())
+  table.insert(args, task:get_queue_id() or task:get_uid())
+  local ret = rspamd_redis_make_request(task,
     redis_params, -- connect params
-    key, -- hash key
-    false, -- is write
-    rate_get_cb, --callback
-    'mget', -- command
-    fun.totable(fun.map(function(l) return l[2] end, args)) -- arguments
+    nil, -- hash key
+    true, -- is write
+    redis_cb, --callback
+    'evalsha', -- command
+    args -- arguments
   )
   if not ret then
     rspamd_logger.errx(task, 'got error connecting to redis')
   end
 end
 
---- Check or update ratelimit
-local function rate_test_set(task, func)
+local function ratelimit_cb(task)
+  if rspamd_lua_utils.is_rspamc_or_controller(task) then return end
   local args = {}
   -- Get initial task data
   local ip = task:get_from_ip()
@@ -481,9 +472,13 @@ local function rate_test_set(task, func)
   local rcpts = task:get_recipients()
   local rcpts_user = {}
   if rcpts then
-    fun.each(function(r) table.insert(rcpts_user, r['user']) end, rcpts)
-    if fun.any(function(r)
-      fun.any(function(w) return r == w end, whitelisted_rcpts) end,
+    fun.each(function(r)
+      fun.each(function(type) table.insert(rcpts_user, r[type]) end, {'user', 'addr'})
+    end, rcpts)
+    if fun.any(
+      function(r)
+        if fun.any(function(w) return r == w end, whitelisted_rcpts) then return true end
+      end,
       rcpts_user) then
 
       rspamd_logger.infox(task, 'skip ratelimit for whitelisted recipient')
@@ -499,110 +494,136 @@ local function rate_test_set(task, func)
     end
   end
 
+  local redis_keys = {}
+  local redis_keys_rev = {}
+  local function collect_redis_keys()
+    local function collect_cb(err, data)
+      if err then
+        rspamd_logger.errx(task, 'redis error: %1', err)
+      else
+        for i, d in ipairs(data) do
+          if type(d) == 'string' then
+            local plim, size = parse_string_limit(d)
+            if plim then
+              table.insert(args, {{plim, size}, redis_keys_rev[i]})
+            end
+          end
+        end
+        return process_buckets(task, args)
+      end
+    end
+    local params, method
+    if limits_hash then
+      params = {limits_hash, rspamd_lua_utils.unpack(redis_keys)}
+      method = 'HMGET'
+    else
+      method = 'MGET'
+      params = redis_keys
+    end
+    local requested_keys = rspamd_redis_make_request(task,
+      redis_params, -- connect params
+      nil, -- hash key
+      true, -- is write
+      collect_cb, --callback
+      method, -- command
+      params -- arguments
+    )
+    if not requested_keys then
+      rspamd_logger.errx(task, 'got error connecting to redis')
+      return process_buckets(task, args)
+    end
+  end
+
   local rate_key
   for k in pairs(settings) do
     rate_key = dynamic_rate_key(task, k)
     if rate_key then
       if type(rate_key) == 'table' then
         for _, rk in ipairs(rate_key) do
-          if type(settings[k]) == 'table' then
-            table.insert(args, {settings[k], rk})
-          elseif type(settings[k]) == 'string' and
+          if type(settings[k]) == 'string' and
               (custom_keywords[settings[k]] and type(custom_keywords[settings[k]]['get_limit']) == 'function') then
             local res = custom_keywords[settings[k]]['get_limit'](task)
-            if type(res) == 'table' then
-              table.insert(args, {res, rate_key})
-            elseif type(res) == 'string' then
-              local plim, size = parse_string_limit(res)
+            if type(res) == 'string' then res = {res} end
+            for _, r in ipairs(res) do
+              local plim, size = parse_string_limit(r, true)
               if plim then
-                table.insert(args, {{size, plim, 1}, rate_key})
+                table.insert(args, {{plim, size}, rk})
+              else
+                local rkey = string.match(settings[k], 'redis:(.*)')
+                if rkey then
+                  table.insert(redis_keys, rkey)
+                  redis_keys_rev[#redis_keys] = rk
+                else
+                  rspamd_logger.infox(task, "Don't know what to do with limit: %1", settings[k])
+                end
               end
             end
           end
         end
       else
-        if type(settings[k]) == 'table' then
-          table.insert(args, {settings[k], rate_key})
-        elseif type(settings[k]) == 'string' and
-            (custom_keywords[settings[k]] and type(custom_keywords[settings[k]]['get_limit']) == 'function') then
+        if type(settings[k]) == 'string' and
+          (custom_keywords[settings[k]] and type(custom_keywords[settings[k]]['get_limit']) == 'function') then
           local res = custom_keywords[settings[k]]['get_limit'](task)
-          if type(res) == 'table' then
-            table.insert(args, {res, rate_key})
-          elseif type(res) == 'string' then
-            local plim, size = parse_string_limit(res)
+          if type(res) == 'string' then res = {res} end
+          for _, r in ipairs(res) do
+            local plim, size = parse_string_limit(r, true)
             if plim then
-              table.insert(args, {{size, plim, 1}, rate_key})
+              table.insert(args, {{plim, size}, rate_key})
+            else
+              local rkey = string.match(r, 'redis:(.*)')
+              if rkey then
+                table.insert(redis_keys, rkey)
+                redis_keys_rev[#redis_keys] = rate_key
+              else
+                rspamd_logger.infox(task, "Don't know what to do with limit: %1", settings[k])
+              end
             end
           end
+        elseif type(settings[k]) == 'table' then
+          for _, rl in ipairs(settings[k]) do
+            table.insert(args, {{rl[1], rl[2]}, rate_key})
+          end
+        elseif type(settings[k]) == 'string' then
+          local rkey = string.match(settings[k], 'redis:(.*)')
+          if rkey then
+            table.insert(redis_keys, rkey)
+            redis_keys_rev[#redis_keys] = rate_key
+          else
+            rspamd_logger.infox(task, "Don't know what to do with limit: %1", settings[k])
+          end
         end
       end
     end
   end
 
-  if #args > 0 then
-    func(task, args)
-  end
-end
-
---- Check limit
-local function rate_test(task)
-  if rspamd_lua_utils.is_rspamc_or_controller(task) then return end
-  rate_test_set(task, check_limits)
-end
---- Update limit
-local function rate_set(task)
-  local action = task:get_metric_action('default')
-
-  if action ~= 'soft reject' then
-    if rspamd_lua_utils.is_rspamc_or_controller(task) then return end
-    rate_test_set(task, set_limits)
-  end
-end
-
-
---- Parse a single limit description
-local function parse_limit(str)
-  local params = rspamd_str_split(str, ':')
-
-  local function set_limit(limit, burst, rate)
-    limit[1] = tonumber(burst)
-    limit[2] = tonumber(rate)
-  end
-
-  if #params ~= 3 then
-    rspamd_logger.errx(rspamd_config, 'invalid limit definition: ' .. str)
-    return
-  end
-
-  local key_keywords = rspamd_str_split(params[1], '_')
-  for _, k in ipairs(key_keywords) do
-    if (custom_keywords[k] and type(custom_keywords[k]['get_value']) == 'function') or
-        (keywords[k] and type(keywords[k]['get_value']) == 'function') then
-      set_limit(settings[params[1]], params[2], params[3])
-    else
-      rspamd_logger.errx(rspamd_config, 'invalid limit type: ' .. params[1])
-    end
+  if redis_keys[1] then
+    return collect_redis_keys()
+  else
+    return process_buckets(task, args)
   end
 end
 
-local opts = rspamd_config:get_all_opt('ratelimit')
+local opts = rspamd_config:get_all_opt(N)
 if opts then
-  local rates = opts['limit']
-  if rates and type(rates) == 'table' then
-    fun.each(parse_limit, rates)
-  elseif rates and type(rates) == 'string' then
-    parse_limit(rates)
+  if opts['limit'] then
+    rspamd_logger.errx(rspamd_config, 'Legacy ratelimit config format no longer supported')
   end
 
   if opts['rates'] and type(opts['rates']) == 'table' then
     -- new way of setting limits
     fun.each(function(t, lim)
       if type(lim) == 'table' then
-        settings[t] = lim
+        settings[t] = {}
+        fun.each(function(l)
+          local plim, size = parse_string_limit(l)
+          if plim then
+            table.insert(settings[t], {plim, size})
+          end
+        end, lim)
       elseif type(lim) == 'string' then
         local plim, size = parse_string_limit(lim)
         if plim then
-          settings[t] = {size, plim, 1}
+          settings[t] = { {plim, size} }
         end
       end
     end, opts['rates'])
@@ -618,11 +639,7 @@ if opts then
 
   local enabled_limits = fun.totable(fun.map(function(t)
     return t
-  end, fun.filter(function(_, lim)
-    return type(lim) == 'string' or
-        (type(lim) == 'table' and type(lim[1]) == 'number' and lim[1] > 0)
-        or (type(lim) == 'table' and (lim[3]))
-  end, settings)))
+  end, settings))
   rspamd_logger.infox(rspamd_config, 'enabled rate buckets: [%1]', table.concat(enabled_limits, ','))
 
   if opts['whitelisted_rcpts'] and type(opts['whitelisted_rcpts']) == 'string' then
@@ -650,10 +667,6 @@ if opts then
     max_rcpt = tonumber(opts['max_rcpt'])
   end
 
-  if opts['max_delay'] then
-    max_rcpt = tonumber(opts['max_delay'])
-  end
-
   if opts['use_ip_score'] then
     use_ip_score = true
     local ip_score_opts = rspamd_config:get_all_opt('ip_score')
@@ -674,38 +687,30 @@ if opts then
     message_func = assert(load(opts['message_func']))()
   end
 
+  if opts['limits_hash'] then
+    limits_hash = opts['limits_hash']
+  end
+
   redis_params = rspamd_parse_redis_server('ratelimit')
   if not redis_params then
     rspamd_logger.infox(rspamd_config, 'no servers are specified, disabling module')
   else
-    if not ratelimit_symbol and not use_ip_score then
-      rspamd_config:register_symbol({
-        name = 'RATELIMIT_CHECK',
-        callback = rate_test,
-        type = 'prefilter',
-        priority = 4,
-      })
-    else
-      local symbol
-      if not ratelimit_symbol then
-        symbol = 'RATELIMIT_CHECK'
-      else
-        symbol = ratelimit_symbol
-      end
-      local id = rspamd_config:register_symbol({
-        name = symbol,
-        callback = rate_test,
-      })
-      if use_ip_score then
-        rspamd_config:register_dependency(id, 'IP_SCORE')
-      end
+    local s = {
+      type = 'prefilter,nostat',
+      name = 'RATELIMIT_CHECK',
+      priority = 4,
+      callback = ratelimit_cb,
+    }
+    if use_ip_score then
+      s.type = 'normal'
+    end
+    if ratelimit_symbol then
+      s.name = ratelimit_symbol
+    end
+    local id = rspamd_config:register_symbol(s)
+    if use_ip_score then
+      rspamd_config:register_dependency(id, 'IP_SCORE')
     end
-    rspamd_config:register_symbol({
-      name = 'RATELIMIT_SET',
-      type = 'postfilter',
-      priority = 5,
-      callback = rate_set,
-    })
     for _, v in pairs(custom_keywords) do
       if type(v) == 'table' and type(v['init']) == 'function' then
         v['init']()
@@ -713,5 +718,6 @@ if opts then
     end
   end
 end
-
-
+rspamd_config:add_on_load(function(cfg, ev_base, worker)
+  load_scripts(cfg, ev_base)
+end)

+ 14 - 55
data/conf/rspamd/custom/ratelimit.lua

@@ -1,66 +1,25 @@
 local custom_keywords = {
   ['customrl'] = {},
 }
+
 function custom_keywords.customrl.get_value(task)
   local rspamd_logger = require "rspamd_logger"
-  local rspamd_redis = require "rspamd_redis"
-  local rspamd_regexp = require "rspamd_regexp"
-  local re = rspamd_regexp.create('/^\\s*$/i')
-  local envfrom = task:get_from(1)
-  local env_from_addr = envfrom[1].addr:lower() -- get smtp from addr in lower case
-  local env_from_domain = envfrom[1].domain:lower() -- get smtp from domain in lower case
-
-  local function rlo(object) -- get ratelimited object
-    local rlobj = string.format('%s', object)
-
-    local rl_ret, rl_obj = rspamd_redis.make_request_sync({host="172.22.1.249:6379", cmd='HGET', args={'RL_OBJECT', rlobj}, timeout=2.0})
-
-    if rl_ret and rl_obj then
-      return rl_obj
-    else
-      return false
-    end
-  end
-
-  rl_addr = rlo(env_from_addr)
-  rl_domain = rlo(env_from_domain)
-  if type(rl_addr) == 'string' and not re:match(rl_addr) then
-    rspamd_logger.infox(rspamd_config, "returning ratelimit object for %s", env_from_addr)
-    return rl_addr
-  elseif type(rl_domain) == 'string' and not re:match(rl_domain) then
-    rspamd_logger.infox(rspamd_config, "returning ratelimit object for %s", env_from_domain)
-    return rl_domain
+  if task:has_symbol('DYN_RL') then
+    rspamd_logger.infox(rspamd_config, "task has a dynamic ratelimit symbol, processing...")
+    return "check"
+  else
+    rspamd_logger.infox(rspamd_config, "task has no dynamic ratelimit symbol, skipping...")
+    return
   end
 end
 function custom_keywords.customrl.get_limit(task)
   local rspamd_logger = require "rspamd_logger"
-  local rspamd_redis = require "rspamd_redis"
-  local rspamd_regexp = require "rspamd_regexp"
-  local re = rspamd_regexp.create('/^\\s*$/i')
-  local envfrom = task:get_from(1)
-  local env_from_addr = envfrom[1].addr:lower() -- get smtp from addr in lower case
-  local env_from_domain = envfrom[1].domain:lower() -- get smtp from domain in lower case
-
-  local function rlv(object) -- get ratelimited object
-    local rlobj = string.format('%s', object)
-
-    local rl_ret, rl_value = rspamd_redis.make_request_sync({host="172.22.1.249:6379", cmd='HGET', args={'RL_VALUE', rlobj}, timeout=2.0})
-
-    if rl_ret and rl_value then
-      return rl_value
-    else
-      return false
-    end
-  end
-
-  rl_addr = rlv(env_from_addr)
-  rl_domain = rlv(env_from_domain)
-  if type(rl_addr) == 'string' and not re:match(rl_addr) then
-    rspamd_logger.infox(rspamd_config, "returning ratelimit %s for %s", rl_addr, env_from_addr)
-    return rl_addr
-  elseif type(rl_domain) == 'string' and not re:match(rl_domain) then
-    rspamd_logger.infox(rspamd_config, "returning ratelimit %s for %s", rl_domain, env_from_domain)
-    return rl_domain
+  local dyn_rl_symbol = task:get_symbol("DYN_RL")
+  if dyn_rl_symbol then
+    local rl_value = dyn_rl_symbol[1].options[1]
+    rspamd_logger.infox(rspamd_config, "dynamic ratelimit symbol has option %s, returning...", rl_value)
+    return rl_value
   end
 end
-return custom_keywords
+-- returning custom keywords
+return custom_keywords

+ 57 - 0
data/conf/rspamd/lua/rspamd.local.lua

@@ -50,4 +50,61 @@ rspamd_config:register_symbol({
   priority = 11
 })
 
+rspamd_config:register_symbol({
+  name = 'DYN_RL_CHECK',
+  type = 'prefilter',
+  callback = function(task)
+    local util = require("rspamd_util")
+    local redis_params = rspamd_parse_redis_server('dyn_rl')
+    local rspamd_logger = require "rspamd_logger"
+    local envfrom = task:get_from(1)
+    local env_from_domain = envfrom[1].domain:lower() -- get smtp from domain in lower case
+    local env_from_addr = envfrom[1].addr:lower() -- get smtp from addr in lower case
+
+    local function redis_cb_user(err, data)
+
+      if err or type(data) ~= 'string' then
+        rspamd_logger.infox(rspamd_config, "dynamic ratelimit request for user %s returned invalid or empty data (\"%s\") or error (\"%s\") - trying dynamic ratelimit for domain...", env_from_addr, data, err)
+
+        local function redis_key_cb_domain(err, data)
+          if err or type(data) ~= 'string' then
+            rspamd_logger.infox(rspamd_config, "dynamic ratelimit request for domain %s returned invalid or empty data (\"%s\") or error (\"%s\")", env_from_domain, data, err)
+          else
+            rspamd_logger.infox(rspamd_config, "found dynamic ratelimit in redis for domain %s with value %s", env_from_domain, data)
+            task:insert_result('DYN_RL', 0.0, data)
+          end
+        end
 
+        local redis_ret_domain = rspamd_redis_make_request(task,
+          redis_params, -- connect params
+          env_from_domain, -- hash key
+          false, -- is write
+          redis_key_cb_domain, --callback
+          'HGET', -- command
+          {'RL_VALUE', env_from_domain} -- arguments
+        )
+        if not redis_ret_domain then
+          rspamd_logger.infox(rspamd_config, "cannot make request to load ratelimit for domain")
+        end
+      else
+        rspamd_logger.infox(rspamd_config, "found dynamic ratelimit in redis for user %s with value %s", env_from_addr, data)
+        task:insert_result('DYN_RL', 0.0, data)
+      end
+
+    end
+
+    local redis_ret_user = rspamd_redis_make_request(task,
+      redis_params, -- connect params
+      env_from_addr, -- hash key
+      false, -- is write
+      redis_cb_user, --callback
+      'HGET', -- command
+      {'RL_VALUE', env_from_addr} -- arguments
+    )
+    if not redis_ret_user then
+      rspamd_logger.infox(rspamd_config, "cannot make request to load ratelimit for user")
+    end
+    return true
+  end,
+  priority = 20
+})

+ 1 - 5
data/web/edit.php

@@ -238,7 +238,6 @@ if (isset($_SESSION['mailcow_cc_role'])) {
           }
           ?>
       <hr>
-      <!--
       <form data-id="domratelimit" class="form-inline well" method="post">
         <div class="form-group">
           <label class="control-label">Ratelimit</label>
@@ -256,7 +255,6 @@ if (isset($_SESSION['mailcow_cc_role'])) {
         </div>
       </form>
       <hr>
-      -->
       <div class="row">
         <div class="col-sm-6">
           <h4><?=$lang['user']['spamfilter_wl'];?></h4>
@@ -316,7 +314,7 @@ if (isset($_SESSION['mailcow_cc_role'])) {
       !empty($_GET["aliasdomain"])) {
         $alias_domain = $_GET["aliasdomain"];
         $result = mailbox('get', 'alias_domain_details', $alias_domain);
-        // $rl = mailbox('get', 'domain_ratelimit', $alias_domain);
+        $rl = mailbox('get', 'domain_ratelimit', $alias_domain);
         if (!empty($result)) {
         ?>
           <h4><?=$lang['edit']['edit_alias_domain'];?></h4>
@@ -341,7 +339,6 @@ if (isset($_SESSION['mailcow_cc_role'])) {
               </div>
             </div>
           </form>
-          <!--
           <hr>
           <form data-id="domratelimit" class="form-inline well" method="post">
             <div class="form-group">
@@ -359,7 +356,6 @@ if (isset($_SESSION['mailcow_cc_role'])) {
               <button class="btn btn-default" id="edit_selected" data-id="domratelimit" data-item="<?=$alias_domain;?>" data-api-url='edit/domain-ratelimit' data-api-attr='{}' href="#"><?=$lang['admin']['save'];?></button>
             </div>
           </form>
-          -->
           <?php
           if (!empty($dkim = dkim('details', $alias_domain))) {
           ?>

+ 1 - 3
data/web/inc/functions.mailbox.inc.php

@@ -1214,7 +1214,6 @@ function mailbox($_action, $_type, $_data = null) {
             }
             if (empty($rl_value)) {
               try {
-                $redis->hDel('RL_OBJECT', $domain);
                 $redis->hDel('RL_VALUE', $domain);
               }
               catch (RedisException $e) {
@@ -1227,7 +1226,6 @@ function mailbox($_action, $_type, $_data = null) {
             }
             else {
               try {
-                $redis->hSet('RL_OBJECT', $domain, '1');
                 $redis->hSet('RL_VALUE', $domain, $rl_value . ' / 1' . $rl_frame);
               }
               catch (RedisException $e) {
@@ -2389,7 +2387,7 @@ function mailbox($_action, $_type, $_data = null) {
             return false;
           }
           try {
-            if (($rl_value = $redis->hGet('RL_VALUE', $_data)) && $redis->hGet('RL_OBJECT', $_data)) {
+            if ($rl_value = $redis->hGet('RL_VALUE', $_data)) {
               $rl = explode(' / 1', $rl_value);
               $data['value'] = $rl[0];
               $data['frame'] = $rl[1];

+ 1 - 1
docker-compose.yml

@@ -80,7 +80,7 @@ services:
             - clamd
 
     rspamd-mailcow:
-      image: mailcow/rspamd:1.5
+      image: mailcow/rspamd:1.6
       build: ./data/Dockerfiles/rspamd
       command: > 
         /bin/bash -c "