| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723 | --[[Copyright (c) 2011-2017, Vsevolod Stakhov <vsevolod@highsecure.ru>Copyright (c) 2016-2017, Andrew Lewis <nerf@judo.za.org>Licensed under the Apache License, Version 2.0 (the "License");you may not use this file except in compliance with the License.You may obtain a copy of the License at    http://www.apache.org/licenses/LICENSE-2.0Unless required by applicable law or agreed to in writing, softwaredistributed under the License is distributed on an "AS IS" BASIS,WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.See the License for the specific language governing permissions andlimitations under the License.]]--if confighelp then  returnend-- A plugin that implements ratelimits using redislocal E, settings = {}, {}local N = 'ratelimit'-- Senders that are considered as bouncelocal bounce_senders = {'postmaster', 'mailer-daemon', '', 'null', 'fetchmail-daemon', 'mdaemon'}-- Do not check ratelimits for these recipientslocal whitelisted_rcpts = {'postmaster', 'mailer-daemon'}local whitelisted_iplocal whitelisted_userlocal max_rcpt = 5local redis_paramslocal ratelimit_symbol-- Do not delay mail after 1 daylocal use_ip_score = falselocal rl_prefix = 'RL'local ip_score_lower_bound = 10local ip_score_ham_multiplier = 1.1local ip_score_spam_divisor = 1.1local limits_hashlocal message_func = function(_, limit_type)  return string.format('Ratelimit "%s" exceeded', limit_type)endlocal rspamd_logger = require "rspamd_logger"local rspamd_util = require "rspamd_util"local rspamd_lua_utils = require "lua_util"local lua_redis = require "lua_redis"local fun = require "fun"local user_keywords = {'user'}local redis_script_shalocal redis_script = [[local bucketlocal limited = falselocal buckets = {}local queue_id = table.remove(ARGV)local now = table.remove(ARGV)local argi = 0for i = 1, #KEYS do  local key = KEYS[i]  local period = tonumber(ARGV[argi+1])  local limit = tonumber(ARGV[argi+2])  if not buckets[key] then    buckets[key] = {      max_period = period,      limits = { {period, limit} },    }  else    table.insert(buckets[key].limits, {period, limit})    if period > buckets[key].max_period then      buckets[key].max_period = period    end  end  argi = argi + 2endfor k, v in pairs(buckets) do  local maxp = v.max_period  redis.call('ZREMRANGEBYSCORE', k, '-inf', now - maxp)  for _, lim in ipairs(v.limits) do    local period = lim[1]    local limit = lim[2]    local rate    if period == maxp then      rate = redis.call('ZCARD', k)    else      rate = redis.call('ZCOUNT', k, now - period, '+inf')    end    if rate and rate >= limit then      limited = true      bucket = k    end  end  redis.call('EXPIRE', k, maxp)  if limited then break endendif not limited then  for k in pairs(buckets) do    redis.call('ZADD', k, now, queue_id)  endendreturn {limited, bucket}]]local redis_script_symbol = [[local limited = falselocal buckets, results = {}, {}local queue_id = table.remove(ARGV)local now = table.remove(ARGV)local argi = 0for i = 1, #KEYS do  local key = KEYS[i]  local period = tonumber(ARGV[argi+1])  local limit = tonumber(ARGV[argi+2])  if not buckets[key] then    buckets[key] = {      max_period = period,      limits = { {period, limit} },    }  else    table.insert(buckets[key].limits, {period, limit})    if period > buckets[key].max_period then      buckets[key].max_period = period    end  end  argi = argi + 2endfor k, v in pairs(buckets) do  local maxp = v.max_period  redis.call('ZREMRANGEBYSCORE', k, '-inf', now - maxp)  for _, lim in ipairs(v.limits) do    local period = lim[1]    local limit = lim[2]    local rate    if period == maxp then      rate = redis.call('ZCARD', k)    else      rate = redis.call('ZCOUNT', k, now - period, '+inf')    end    if rate then      local mult = 2 * math.tanh(rate / (limit * 2))      if mult >= 0.5 then        table.insert(results, {k, tostring(mult)})      end    end  end  redis.call('ZADD', k, now, queue_id)  redis.call('EXPIRE', k, maxp)endreturn results]]local function load_scripts(cfg, ev_base)  local function rl_script_cb(err, data)    if err then      rspamd_logger.errx(cfg, 'Script loading failed: ' .. err)    elseif type(data) == 'string' then      redis_script_sha = data    end  end  local script  if ratelimit_symbol then    script = redis_script_symbol  else    script = redis_script  end  lua_redis.redis_make_request_taskless(    ev_base,    cfg,    redis_params,    nil, -- key    true, -- is write    rl_script_cb, --callback    'SCRIPT', -- command    {'LOAD', script}  )endlocal limit_parserlocal function parse_string_limit(lim, no_error)  local function parse_time_suffix(s)    if s == 's' then      return 1    elseif s == 'm' then      return 60    elseif s == 'h' then      return 3600    elseif s == 'd' then      return 86400    end  end  local function parse_num_suffix(s)    if s == '' then      return 1    elseif s == 'k' then      return 1000    elseif s == 'm' then      return 1000000    elseif s == 'g' then      return 1000000000    end  end  local lpeg = require "lpeg"  if not limit_parser then    local digit = lpeg.R("09")    limit_parser = {}    limit_parser.integer =    (lpeg.S("+-") ^ -1) *            (digit   ^  1)    limit_parser.fractional =    (lpeg.P(".")   ) *            (digit ^ 1)    limit_parser.number =    (limit_parser.integer *            (limit_parser.fractional ^ -1)) +            (lpeg.S("+-") * limit_parser.fractional)    limit_parser.time = lpeg.Cf(lpeg.Cc(1) *            (limit_parser.number / tonumber) *            ((lpeg.S("smhd") / parse_time_suffix) ^ -1),      function (acc, val) return acc * val end)    limit_parser.suffixed_number = lpeg.Cf(lpeg.Cc(1) *            (limit_parser.number / tonumber) *            ((lpeg.S("kmg") / parse_num_suffix) ^ -1),      function (acc, val) return acc * val end)    limit_parser.limit = lpeg.Ct(limit_parser.suffixed_number *            (lpeg.S(" ") ^ 0) * lpeg.S("/") * (lpeg.S(" ") ^ 0) *            limit_parser.time)  end  local t = lpeg.match(limit_parser.limit, lim)  if t and t[1] and t[2] and t[2] ~= 0 then    return t[2], t[1]  end  if not no_error then    rspamd_logger.errx(rspamd_config, 'bad limit: %s', lim)  end  return nilendlocal function resize_element(x_score, x_total, element)  local x_ip_score  if not x_total then x_total = 0 end  if x_total < ip_score_lower_bound or x_total <= 0 then    x_score = 1  else    x_score = x_score / x_total  end  if x_score > 0 then    x_ip_score = x_score / ip_score_spam_divisor    element = element * rspamd_util.tanh(2.718281 * x_ip_score)  elseif x_score < 0 then    x_ip_score = ((1 + (x_score * -1)) * ip_score_ham_multiplier)    element = element * x_ip_score  end  return elementend--- Check whether this addr is bouncelocal function check_bounce(from)  return fun.any(function(b) return b == from end, bounce_senders)endlocal custom_keywords = {}local keywords = {  ['ip'] = {    ['get_value'] = function(task)      local ip = task:get_ip()      if ip and ip:is_valid() then return ip end      return nil    end,  },  ['rip'] = {    ['get_value'] = function(task)      local ip = task:get_ip()      if ip and ip:is_valid() and not ip:is_local() then return ip end      return nil    end,  },  ['from'] = {    ['get_value'] = function(task)      local from = task:get_from(0)      if ((from or E)[1] or E).addr then        return string.lower(from[1]['addr'])      end      return nil    end,  },  ['bounce'] = {    ['get_value'] = function(task)      local from = task:get_from(0)      if not ((from or E)[1] or E).user then        return '_'      end      if check_bounce(from[1]['user']) then return '_' else return nil end    end,  },  ['asn'] = {    ['get_value'] = function(task)      local asn = task:get_mempool():get_variable('asn')      if not asn then        return nil      else        return asn      end    end,  },  ['user'] = {    ['get_value'] = function(task)      local auser = task:get_user()      if not auser then        return nil      else        return auser      end    end,  },  ['to'] = {    ['get_value'] = function()      return '%s' -- 'to' is special    end,  },}local function dynamic_rate_key(task, rtype)  local key_t = {rl_prefix, rtype}  local key_keywords = rspamd_str_split(rtype, '_')  local have_to, have_user = false, false  for _, v in ipairs(key_keywords) do    if (custom_keywords[v] and type(custom_keywords[v]['condition']) == 'function') then      if not custom_keywords[v]['condition']() then return nil end    end    local ret    if custom_keywords[v] and type(custom_keywords[v]['get_value']) == 'function' then      ret = custom_keywords[v]['get_value'](task)    elseif keywords[v] and type(keywords[v]['get_value']) == 'function' then      ret = keywords[v]['get_value'](task)    end    if not ret then return nil end    for _, uk in ipairs(user_keywords) do      if v == uk then have_user = true end      if have_user then break end    end    if v == 'to' then have_to = true end    if type(ret) ~= 'string' then ret = tostring(ret) end    table.insert(key_t, ret)  end  if (not have_user) and task:get_user() then    return nil  end  if not have_to then    return table.concat(key_t, ":")  else    local rate_keys = {}    local rcpts = task:get_recipients(0)    if not ((rcpts or E)[1] or E).addr then      return nil    end    local key_s = table.concat(key_t, ":")    local total_rcpt = 0    for _, r in ipairs(rcpts) do      if r['addr'] and total_rcpt < max_rcpt then        local key_f = string.format(key_s, string.lower(r['addr']))        table.insert(rate_keys, key_f)        total_rcpt = total_rcpt + 1      end    end    return rate_keys  endendlocal function process_buckets(task, buckets)  if not buckets then return end  local function rl_redis_cb(err, data)    if err then      rspamd_logger.infox(task, 'got error while setting limit: %1', err)    end    if not data then return end    if data[1] == 1 then      rspamd_logger.infox(task,        'ratelimit "%s" exceeded',        data[2])      task:set_pre_result('soft reject',        message_func(task, data[2]))    end  end  local function rl_symbol_redis_cb(err, data)    if err then      rspamd_logger.infox(task, 'got error while setting limit: %1', err)    end    if not data then return end    for i, b in ipairs(data) do      task:insert_result(ratelimit_symbol, b[2], string.format('%s:%s:%s', i, b[1], b[2]))    end  end  local redis_cb = rl_redis_cb  if ratelimit_symbol then redis_cb = rl_symbol_redis_cb end  local args = {redis_script_sha, #buckets}  for _, bucket in ipairs(buckets) do    table.insert(args, bucket[2])  end  for _, bucket in ipairs(buckets) do    if use_ip_score then      local asn_score,total_asn,        country_score,total_country,        ipnet_score,total_ipnet,        ip_score, total_ip = task:get_mempool():get_variable('ip_score',        'double,double,double,double,double,double,double,double')      local key_keywords = rspamd_str_split(bucket[2], '_')      local has_asn, has_ip = false, false      for _, v in ipairs(key_keywords) do        if v == "asn" then has_asn = true end        if v == "ip" then has_ip = true end        if has_ip and has_asn then break end      end      if has_asn and not has_ip then        bucket[1][2] = resize_element(asn_score, total_asn, bucket[1][2])      elseif has_ip then        if total_ip and total_ip > ip_score_lower_bound then          bucket[1][2] = resize_element(ip_score, total_ip, bucket[1][2])        elseif total_ipnet and total_ipnet > ip_score_lower_bound then          bucket[1][2] = resize_element(ipnet_score, total_ipnet, bucket[1][2])        elseif total_asn and total_asn > ip_score_lower_bound then          bucket[1][2] = resize_element(asn_score, total_asn, bucket[1][2])        elseif total_country and total_country > ip_score_lower_bound then          bucket[1][2] = resize_element(country_score, total_country, bucket[1][2])        else          bucket[1][2] = resize_element(ip_score, total_ip, bucket[1][2])        end      end    end    table.insert(args, bucket[1][1])    table.insert(args, bucket[1][2])  end  table.insert(args, rspamd_util.get_time())  table.insert(args, task:get_queue_id() or task:get_uid())  local ret = rspamd_redis_make_request(task,    redis_params, -- connect params    nil, -- hash key    true, -- is write    redis_cb, --callback    'evalsha', -- command    args -- arguments  )  if not ret then    rspamd_logger.errx(task, 'got error connecting to redis')  endendlocal function ratelimit_cb(task)  if rspamd_lua_utils.is_rspamc_or_controller(task) then return end  local args = {}  -- Get initial task data  local ip = task:get_from_ip()  if ip and ip:is_valid() and whitelisted_ip then    if whitelisted_ip:get_key(ip) then      -- Do not check whitelisted ip      rspamd_logger.infox(task, 'skip ratelimit for whitelisted IP')      return    end  end  -- Parse all rcpts  local rcpts = task:get_recipients()  local rcpts_user = {}  if rcpts then    fun.each(function(r)      fun.each(function(type) table.insert(rcpts_user, r[type]) end, {'user', 'addr'})    end, rcpts)    if fun.any(      function(r)        if fun.any(function(w) return r == w end, whitelisted_rcpts) then return true end      end,      rcpts_user) then      rspamd_logger.infox(task, 'skip ratelimit for whitelisted recipient')      return    end  end  -- Get user (authuser)  if whitelisted_user then    local auser = task:get_user()    if whitelisted_user:get_key(auser) then      rspamd_logger.infox(task, 'skip ratelimit for whitelisted user')      return    end  end  local redis_keys = {}  local redis_keys_rev = {}  local function collect_redis_keys()    local function collect_cb(err, data)      if err then        rspamd_logger.errx(task, 'redis error: %1', err)      else        for i, d in ipairs(data) do          if type(d) == 'string' then            local plim, size = parse_string_limit(d)            if plim then              table.insert(args, {{plim, size}, redis_keys_rev[i]})            end          end        end        return process_buckets(task, args)      end    end    local params, method    if limits_hash then      params = {limits_hash, rspamd_lua_utils.unpack(redis_keys)}      method = 'HMGET'    else      method = 'MGET'      params = redis_keys    end    local requested_keys = rspamd_redis_make_request(task,      redis_params, -- connect params      nil, -- hash key      true, -- is write      collect_cb, --callback      method, -- command      params -- arguments    )    if not requested_keys then      rspamd_logger.errx(task, 'got error connecting to redis')      return process_buckets(task, args)    end  end  local rate_key  for k in pairs(settings) do    rate_key = dynamic_rate_key(task, k)    if rate_key then      if type(rate_key) == 'table' then        for _, rk in ipairs(rate_key) do          if type(settings[k]) == 'string' and              (custom_keywords[settings[k]] and type(custom_keywords[settings[k]]['get_limit']) == 'function') then            local res = custom_keywords[settings[k]]['get_limit'](task)            if type(res) == 'string' then res = {res} end            for _, r in ipairs(res) do              local plim, size = parse_string_limit(r, true)              if plim then                table.insert(args, {{plim, size}, rk})              else                local rkey = string.match(settings[k], 'redis:(.*)')                if rkey then                  table.insert(redis_keys, rkey)                  redis_keys_rev[#redis_keys] = rk                else                  rspamd_logger.infox(task, "Don't know what to do with limit: %1", settings[k])                end              end            end          end        end      else        if type(settings[k]) == 'string' and          (custom_keywords[settings[k]] and type(custom_keywords[settings[k]]['get_limit']) == 'function') then          local res = custom_keywords[settings[k]]['get_limit'](task)          if type(res) == 'string' then res = {res} end          for _, r in ipairs(res) do            local plim, size = parse_string_limit(r, true)            if plim then              table.insert(args, {{plim, size}, rate_key})            else              local rkey = string.match(r, 'redis:(.*)')              if rkey then                table.insert(redis_keys, rkey)                redis_keys_rev[#redis_keys] = rate_key              else                rspamd_logger.infox(task, "Don't know what to do with limit: %1", settings[k])              end            end          end        elseif type(settings[k]) == 'table' then          for _, rl in ipairs(settings[k]) do            table.insert(args, {{rl[1], rl[2]}, rate_key})          end        elseif type(settings[k]) == 'string' then          local rkey = string.match(settings[k], 'redis:(.*)')          if rkey then            table.insert(redis_keys, rkey)            redis_keys_rev[#redis_keys] = rate_key          else            rspamd_logger.infox(task, "Don't know what to do with limit: %1", settings[k])          end        end      end    end  end  if redis_keys[1] then    return collect_redis_keys()  else    return process_buckets(task, args)  endendlocal opts = rspamd_config:get_all_opt(N)if opts then  if opts['limit'] then    rspamd_logger.errx(rspamd_config, 'Legacy ratelimit config format no longer supported')  end  if opts['rates'] and type(opts['rates']) == 'table' then    -- new way of setting limits    fun.each(function(t, lim)      if type(lim) == 'table' then        settings[t] = {}        fun.each(function(l)          local plim, size = parse_string_limit(l)          if plim then            table.insert(settings[t], {plim, size})          end        end, lim)      elseif type(lim) == 'string' then        local plim, size = parse_string_limit(lim)        if plim then          settings[t] = { {plim, size} }        end      end    end, opts['rates'])  end  if opts['dynamic_rates'] and type(opts['dynamic_rates']) == 'table' then    fun.each(function(t, lim)      if type(lim) == 'string' then        settings[t] = lim      end    end, opts['dynamic_rates'])  end  local enabled_limits = fun.totable(fun.map(function(t)    return t  end, settings))  rspamd_logger.infox(rspamd_config, 'enabled rate buckets: [%1]', table.concat(enabled_limits, ','))  if opts['whitelisted_rcpts'] and type(opts['whitelisted_rcpts']) == 'string' then    whitelisted_rcpts = rspamd_str_split(opts['whitelisted_rcpts'], ',')  elseif type(opts['whitelisted_rcpts']) == 'table' then    whitelisted_rcpts = opts['whitelisted_rcpts']  end  if opts['whitelisted_ip'] then    whitelisted_ip = rspamd_map_add('ratelimit', 'whitelisted_ip', 'radix',      'Ratelimit whitelist ip map')  end  if opts['whitelisted_user'] then    whitelisted_user = rspamd_map_add('ratelimit', 'whitelisted_user', 'set',      'Ratelimit whitelist user map')  end  if opts['symbol'] then    -- We want symbol instead of pre-result    ratelimit_symbol = opts['symbol']  end  if opts['max_rcpt'] then    max_rcpt = tonumber(opts['max_rcpt'])  end  if opts['use_ip_score'] then    use_ip_score = true    local ip_score_opts = rspamd_config:get_all_opt('ip_score')    if ip_score_opts and ip_score_opts['lower_bound'] then      ip_score_lower_bound = ip_score_opts['lower_bound']    end  end  if opts['custom_keywords'] then    custom_keywords = dofile(opts['custom_keywords'])  end  if opts['user_keywords'] then    user_keywords = opts['user_keywords']  end  if opts['message_func'] then    message_func = assert(load(opts['message_func']))()  end  if opts['limits_hash'] then    limits_hash = opts['limits_hash']  end  redis_params = rspamd_parse_redis_server('ratelimit')  if not redis_params then    rspamd_logger.infox(rspamd_config, 'no servers are specified, disabling module')  else    local s = {      type = 'prefilter,nostat',      name = 'RATELIMIT_CHECK',      priority = 4,      callback = ratelimit_cb,    }    if use_ip_score then      s.type = 'normal'    end    if ratelimit_symbol then      s.name = ratelimit_symbol    end    local id = rspamd_config:register_symbol(s)    if use_ip_score then      rspamd_config:register_dependency(id, 'IP_SCORE')    end    for _, v in pairs(custom_keywords) do      if type(v) == 'table' and type(v['init']) == 'function' then        v['init']()      end    end  endendrspamd_config:add_on_load(function(cfg, ev_base, worker)  load_scripts(cfg, ev_base)end)
 |