diff --git a/Makefile b/Makefile index 0e5bcc719650..57b4ae0142e7 100644 --- a/Makefile +++ b/Makefile @@ -332,6 +332,12 @@ install: runtime $(ENV_INSTALL) -d $(ENV_INST_LUADIR)/apisix/plugins/limit-count $(ENV_INSTALL) apisix/plugins/limit-count/*.lua $(ENV_INST_LUADIR)/apisix/plugins/limit-count/ + $(ENV_INSTALL) -d $(ENV_INST_LUADIR)/apisix/plugins/limit-count/sliding-window + $(ENV_INSTALL) apisix/plugins/limit-count/sliding-window/*.lua $(ENV_INST_LUADIR)/apisix/plugins/limit-count/sliding-window/ + + $(ENV_INSTALL) -d $(ENV_INST_LUADIR)/apisix/plugins/limit-count/sliding-window/store + $(ENV_INSTALL) apisix/plugins/limit-count/sliding-window/store/*.lua $(ENV_INST_LUADIR)/apisix/plugins/limit-count/sliding-window/store/ + $(ENV_INSTALL) -d $(ENV_INST_LUADIR)/apisix/plugins/opa $(ENV_INSTALL) apisix/plugins/opa/*.lua $(ENV_INST_LUADIR)/apisix/plugins/opa/ diff --git a/apisix-master-0.rockspec b/apisix-master-0.rockspec index 2beb608baf4a..e548311acd88 100644 --- a/apisix-master-0.rockspec +++ b/apisix-master-0.rockspec @@ -32,6 +32,7 @@ description = { dependencies = { "lua-resty-ctxdump = 0.1-0", + "api7-lua-resty-redis-connector = 0.12.0", "lyaml = 6.2.8-1", "api7-lua-resty-dns-client = 7.1.1-0", "lua-resty-template = 2.0-1", diff --git a/apisix/cli/config.lua b/apisix/cli/config.lua index 4a3fa3534cb7..c5cebd4b4560 100644 --- a/apisix/cli/config.lua +++ b/apisix/cli/config.lua @@ -169,6 +169,7 @@ local _M = { ["balancer-ewma-locks"] = "10m", ["balancer-ewma-last-touched-at"] = "10m", ["plugin-limit-req-redis-cluster-slot-lock"] = "1m", + ["plugin-limit-count-lock"] = "1m", ["plugin-limit-count-redis-cluster-slot-lock"] = "1m", ["plugin-limit-conn-redis-cluster-slot-lock"] = "1m", ["plugin-ai-rate-limiting"] = "10m", diff --git a/apisix/cli/ngx_tpl.lua b/apisix/cli/ngx_tpl.lua index b823bd182cf7..51c2a3222956 100644 --- a/apisix/cli/ngx_tpl.lua +++ b/apisix/cli/ngx_tpl.lua @@ -323,6 +323,7 @@ http { {% if enabled_plugins["limit-count"] then %} lua_shared_dict plugin-limit-count {* http.lua_shared_dict["plugin-limit-count"] *}; + lua_shared_dict plugin-limit-count-lock {* http.lua_shared_dict["plugin-limit-count-lock"] *}; lua_shared_dict plugin-limit-count-redis-cluster-slot-lock {* http.lua_shared_dict["plugin-limit-count-redis-cluster-slot-lock"] *}; lua_shared_dict plugin-limit-count-reset-header {* http.lua_shared_dict["plugin-limit-count"] *}; {% end %} diff --git a/apisix/plugins/limit-count/delayed-syncer.lua b/apisix/plugins/limit-count/delayed-syncer.lua new file mode 100644 index 000000000000..757d7d5e96bd --- /dev/null +++ b/apisix/plugins/limit-count/delayed-syncer.lua @@ -0,0 +1,433 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- + +local core = require("apisix.core") +local resty_lock = require "resty.lock" +local cjson_safe = require "cjson.safe" +local table_new = require("table.new") +local table_nkeys = require("table.nkeys") + +local setmetatable = setmetatable +local tostring = tostring +local pairs = pairs +local ipairs = ipairs +local tab_insert = table.insert +local type = type +local pcall = pcall + +local ngx_now = ngx.now +local ngx_shared = ngx.shared +local worker_id = ngx.worker.id +local ngx_worker = ngx.worker +local ngx_sleep = ngx.sleep +local ngx_timer = ngx.timer + + +local KEY_PREFIX_LOCKER = "locker#" +local KEY_PREFIX_LOCAL_DELTA = "local_delta#" -- delta since last time sync with redis +local KEY_PREFIX_LOCAL_DELTA_KEYS = "local_delta_keys#" -- keys to be sync with redis next time + -- per plugin instance timer, in server instance dimension +local KEY_PREFIX_SYNC_TIMER = "sync_timer#" +local KEY_PREFIX_REMOTE_QUOTA = "remote_quota#" -- save remaining/reset/sync_at in JSON format + +local time_to_sync_records = {} + +local _M = {} + +local mt = { + __index = _M +} + + +function _M.build_key(self, prefix, key) + if self.shd_per_worker then + return prefix .. worker_id() .. "#" .. key + end + return prefix .. key +end + + +function _M.key_locker(self, key) + return self:build_key(KEY_PREFIX_LOCKER, key) +end + + +function _M.key_local_delta(self, key) + return self:build_key(KEY_PREFIX_LOCAL_DELTA, key) +end + + +function _M.key_local_delta_keys(self, syncer_id) + return self:build_key(KEY_PREFIX_LOCAL_DELTA_KEYS, syncer_id) +end + + +function _M.key_sync_timer(self, syncer_id) + return self:build_key(KEY_PREFIX_SYNC_TIMER, syncer_id) +end + + +function _M.key_remote_quota(self, key) + return self:build_key(KEY_PREFIX_REMOTE_QUOTA, key) +end + + +function _M.sync_to_shm(self, key, remaining, reset, local_delta) + local quota = { + remaining = remaining, + reset = reset, + sync_at = ngx_now(), + } + + local _, err, quota_json + + quota_json, err = cjson_safe.encode(quota) + if err then + core.log.error("encode remote_quota to json failed: ", err) + return err + end + + _, err = self.shd:set(self:key_remote_quota(key), quota_json, 2 * self.window) + if err then + core.log.error("set remote quota to shm failed: ", err, ", key: ", key) + return err + end + + _, err = self.shd:incr(self:key_local_delta(key), -local_delta, 0, 2 * self.window) + if err then + core.log.error("incr local delta shm to failed: ", err, ", key: ", key) + return err + end +end + + +function _M.release(self, syncer_id) + self.shd:delete(self:key_sync_timer(syncer_id)) +end + + +function _M.delayed_sync(self, key, cost, syncer_id) + local locker, err = resty_lock:new(self.lock_shdict_name) + if not locker then + core.log.error("new resty locker failed: ", err, ", syncer_id: ", syncer_id) + return nil, nil, err + end + + local elapsed + elapsed, err = locker:lock(self:key_locker(key)) + if err then + core.log.error("lock key(" .. key .. ") failed: ", err, ", elapsed: ", elapsed) + return nil, nil, err + end + + -- wrap the delayed syncer call in a pcall to avoid the lock being held forever + local ok, remaining, reset, err = pcall(self._delayed_sync, self, key, cost, syncer_id) + if not ok then + err = remaining + remaining = nil + core.log.error("delayed sync failed: ", err, ", key: ", key) + end + + local ok, err_unlock = locker:unlock() + if not ok then + core.log.error("unlock key(" .. key .. ") failed: ", err_unlock) + end + + return remaining, reset, err +end + + +function _M._delayed_sync(self, key, cost, syncer_id) + local _, reset, remote_quota_json + local local_delta, err = self.shd:get(self:key_local_delta(key)) + if err then + return nil, nil, err + end + if not local_delta then + local_delta = 0 + end + + remote_quota_json, err = self.shd:get(self:key_remote_quota(key)) + if err then + return nil, nil, err + end + + core.log.info("trying to delayed sync, key: ", key, + ", local_delta: ", local_delta, + ", cost: ", cost, + ", syncer_id: ", syncer_id) + + local remote_remaining, remote_reset, sync_at, quota + if remote_quota_json then + core.log.info("remote_quota_json: ", remote_quota_json) + quota, err = cjson_safe.decode(tostring(remote_quota_json)) + if err then + core.log.error("decode remote_quota_json failed: ", err) + return nil, nil, err + end + + remote_remaining, remote_reset, sync_at = quota.remaining, quota.reset, quota.sync_at + reset = remote_reset - (ngx_now() - sync_at) + if reset < 0 then + reset = 0 -- flag that indicates needing to sync with redis + time_to_sync_records[syncer_id] = nil + end + end + + if not remote_quota_json or 0 == reset then + remote_remaining = 0 + remote_reset = 0 + local remaining_or_err + _, remaining_or_err, reset = self.limiter:incoming(key, local_delta) + if type(remaining_or_err) ~= "string" then + remote_remaining = remaining_or_err + remote_reset = reset + elseif remaining_or_err ~= "rejected" then + core.log.error("sync to redis failed: ", remaining_or_err, ", key: ", key) + if self.limiter.fallback_limiter then + core.log.warn("try use fallback limiter to do rate limiting") + _, remaining_or_err, reset = + self.limiter.fallback_limiter:incoming(key, local_delta) + if type(remaining_or_err) ~= "string" then + remote_remaining = remaining_or_err + remote_reset = reset + elseif remaining_or_err ~= "rejected" then + core.log.error("sync to fallback_limiter failed: ", + remaining_or_err, ", key: ", key) + else + remote_remaining = 0 + remote_reset = reset + end + else + return nil, nil, remaining_or_err + end + else + -- rejected: rate limit exceeded on Redis + remote_remaining = 0 + remote_reset = reset + end + + core.log.info("sync to shm, key: ", key, ", remote_remaining: ", remote_remaining, + ", remote_reset: ", remote_reset) + err = self:sync_to_shm(key, remote_remaining, remote_reset, local_delta) + if err then + return nil, nil, err + end + end + + _, err = self.shd:lpush(self:key_local_delta_keys(syncer_id), key) + if err then + core.log.error("put the keys to be synchronized to redis into the queue failed: ", + err, ", key: ", key) + return nil, nil, err + end + + local key_sync_timer = self:key_sync_timer(syncer_id) + + -- timer has not started or has already triggered, try starting a new one + local now = ngx_now() + if not time_to_sync_records[syncer_id] or time_to_sync_records[syncer_id] <= now then + local time_to_sync = now + self.sync_interval + -- nginx server instance dimension, each plug-in instance corresponds to a timer + -- shd:add - ensure only one worker can start timer + local success + success, err = self.shd:add(key_sync_timer, time_to_sync) + if success then + -- start timer ASAP + local ok, err = ngx_timer.at( + 0, + function (premature) + if not premature then + local ok, err = pcall(self.sync, self, syncer_id, time_to_sync) + if not ok then + core.log.error("sync failed: ", err, ", syncer_id: ", syncer_id) + end + end + self:release(syncer_id) + end + ) + if not ok then + local running = ngx_timer.running_count() + local pending = ngx_timer.pending_count() + core.log.error("failed to create timer: ", err, ", running_count: ", running, + ", pending_count: ", pending, ", syncer_id: ", syncer_id) + self:release(syncer_id) + else + time_to_sync_records[syncer_id] = time_to_sync + end + elseif err == "exists" then + -- other workers + time_to_sync_records[syncer_id], err = self.shd:get(key_sync_timer) + if err then + core.log.error("get sync timer created time failed: ", err) + end + else + core.log.error("try starting new timer failed: ", err) + return nil, nil, err + end + end + + local remaining = remote_remaining - local_delta - cost + if 0 <= remaining then + _, err = self.shd:incr(self:key_local_delta(key), cost, 0, 2 * self.window) + if err then + core.log.error("incr local delta to shm failed: ", err, ", key: ", key) + return nil, nil, err + end + end + + return remaining, reset +end + + +local function sync_key(self, key) + local delta, err = self.shd:get(self:key_local_delta(key)) + if err then + core.log.error("get local delta from shm failed: ", err) + end + + if delta then + local _, remaining_or_err, reset = self.limiter:incoming(key, delta) + -- compat + if type(remaining_or_err) ~= "string" then + self:sync_to_shm(key, remaining_or_err, reset, delta) + elseif remaining_or_err ~= "rejected" then + core.log.error("sync to redis failed: ", remaining_or_err, ", key: ", key) + if self.limiter.fallback_limiter then + core.log.warn("try use fallback limiter to do rate limiting") + if delta < 1 then + delta = 1 + end + _, remaining_or_err, reset = + self.limiter.fallback_limiter:incoming(key, delta) + if type(remaining_or_err) ~= "string" then + self:sync_to_shm(key, remaining_or_err, reset, delta) + elseif remaining_or_err ~= "rejected" then + core.log.error("sync to fallback_limiter failed: ", + remaining_or_err, ", key: ", key) + else + self:sync_to_shm(key, 0, reset, delta) + end + end + else + self:sync_to_shm(key, 0, reset, delta) + end + end +end + + +function _M.sync(self, syncer_id, time_to_sync) + local key_local_delta_keys = self:key_local_delta_keys(syncer_id) -- name of keys queue + local local_delta_keys_dedup = {} -- duplicate removal + while not ngx_worker.exiting() and time_to_sync > ngx_now() do + local key, err = self.shd:rpop(key_local_delta_keys) + if err then + core.log.error("shdict.rpop failed: ", err, ", syncer_id: ", syncer_id) + return + end + if key then + if not local_delta_keys_dedup[key] then + local_delta_keys_dedup[key] = true + end + else + ngx_sleep(0.001) + end + end + + if ngx_worker.exiting() then + core.log.info("sync interrupted due to worker exit") + return + end + + -- drain all remaining keys from the queue + local key = {} + while key ~= nil do + local err + key, err = self.shd:rpop(key_local_delta_keys) + if err then + core.log.error("shdict.rpop failed: ", err, ", syncer_id: ", syncer_id) + return + end + + if key then + if not local_delta_keys_dedup[key] then + local_delta_keys_dedup[key] = true + end + end + end + + local nkeys = table_nkeys(local_delta_keys_dedup) + local local_delta_keys_uniq = table_new(nkeys, 0) + + core.log.info(nkeys, " keys to be sync, time_to_sync: ", time_to_sync) + + for key, _ in pairs(local_delta_keys_dedup) do + tab_insert(local_delta_keys_uniq, key) + end + + local locker, err = resty_lock:new(self.lock_shdict_name) + if not locker then + core.log.error("new resty locker failed: ", err, ", syncer_id: ", syncer_id) + return + end + + local elapsed + for _, key in ipairs(local_delta_keys_uniq) do + elapsed, err = locker:lock(self:key_locker(key)) + if err then + core.log.error("lock key(" .. key .. ") failed: ", err, ", elapsed: ", elapsed) + return + end + + local ok, err = pcall(sync_key, self, key) + if not ok then + core.log.error("sync failed: ", err, ", key: ", key) + end + + local ok, err_unlock = locker:unlock() + if not ok then + core.log.error("unlock key(" .. key .. ") failed: ", err_unlock) + end + end +end + + +function _M.new(shdict_name, limit, window, conf, limiter) + local shd = ngx_shared[shdict_name] + if not shd then + return nil, "shared dict (" .. shdict_name .. ") not found" + end + local lock_shdict_name = shdict_name .. "-lock" + local self = { + shdict_name = shdict_name, + lock_shdict_name = lock_shdict_name, + shd = shd, + conf = conf, + limit = limit, + window = window, + limiter = limiter, + sync_interval = conf.sync_interval, + } + -- self.shd_per_worker = true: simulate multiple nginx server instance + if conf._shd_per_worker then + self.shd_per_worker = true + end + return setmetatable(self, mt) +end + + +return _M diff --git a/apisix/plugins/limit-count/init.lua b/apisix/plugins/limit-count/init.lua index ea1d0158e5a1..47eb983bf47b 100644 --- a/apisix/plugins/limit-count/init.lua +++ b/apisix/plugins/limit-count/init.lua @@ -19,17 +19,19 @@ local apisix_plugin = require("apisix.plugin") local tab_insert = table.insert local ipairs = ipairs local pairs = pairs -local redis_schema = require("apisix.utils.redis-schema") -local policy_to_additional_properties = redis_schema.schema -local get_phase = ngx.get_phase local tonumber = tonumber local type = type local tostring = tostring -local str_format = string.format -local error = error +local redis_schema = require("apisix.utils.redis-schema") +local get_phase = ngx.get_phase +local math_floor = math.floor + +local NO_DELAYED_SYNC = -1 +local policy_to_additional_properties = core.table.deepcopy(redis_schema.schema) local limit_redis_cluster_new local limit_redis_new +local limit_redis_sentinel_new local limit_local_new do local local_src = "apisix.plugins.limit-count.limit-count-local" @@ -40,10 +42,48 @@ do local cluster_src = "apisix.plugins.limit-count.limit-count-redis-cluster" limit_redis_cluster_new = require(cluster_src).new + + local sentinel_src = "apisix.plugins.limit-count.limit-count-redis-sentinel" + limit_redis_sentinel_new = require(sentinel_src).new end local group_conf_lru = core.lrucache.new({ type = 'plugin', }) +local group_limit_lru = core.lrucache.new({type = 'plugin'}) +local lrucache = core.lrucache.new({type = 'plugin', serial_creating = true}) + +policy_to_additional_properties["redis-sentinel"] = { + properties = { + redis_sentinels = { + type = "array", + minItems = 1, + items = { + type = "object", + properties = { + host = {type = "string", minLength = 2}, + port = {type = "integer", minimum = 1, maximum = 65535}, + }, + required = {"host", "port"}, + additionalProperties = false, + }, + }, + redis_master_name = {type = "string", minLength = 1}, + redis_role = { + type = "string", + enum = {"master", "slave"}, + default = "master", + }, + redis_connect_timeout = {type = "integer", minimum = 1, default = 1000}, + redis_read_timeout = {type = "integer", minimum = 1, default = 1000}, + redis_keepalive_timeout = {type = "integer", minimum = 1, default = 60000}, + redis_database = {type = "integer", minimum = 0, default = 0}, + redis_username = {type = "string", minLength = 1}, + redis_password = {type = "string", minLength = 0}, + sentinel_username = {type = "string", minLength = 1}, + sentinel_password = {type = "string", minLength = 0}, + }, + required = {"redis_sentinels", "redis_master_name"}, +} local metadata_defaults = { limit_header = "X-RateLimit-Limit", @@ -84,6 +124,10 @@ local schema = { {type = "string"}, }, }, + window_type = { + type = "string", + enum = {"fixed", "sliding"}, + }, rules = { type = "array", items = { @@ -124,11 +168,14 @@ local schema = { }, policy = { type = "string", - enum = {"local", "redis", "redis-cluster"}, + enum = {"local", "redis", "redis-cluster", "redis-sentinel"}, default = "local", }, allow_degradation = {type = "boolean", default = false}, - show_limit_quota_header = {type = "boolean", default = true} + show_limit_quota_header = {type = "boolean", default = true}, + sync_interval = { + type = "number", + } }, oneOf = { { @@ -155,6 +202,16 @@ local schema = { }, }, ["then"] = policy_to_additional_properties["redis-cluster"], + ["else"] = { + ["if"] = { + properties = { + policy = { + enum = {"redis-sentinel"}, + }, + }, + }, + ["then"] = policy_to_additional_properties["redis-sentinel"], + } } } @@ -178,9 +235,23 @@ function _M.check_schema(conf, schema_type) local ok, err = core.schema.check(schema, conf) if not ok then + -- oneOf conflict: both count/time_window and rules are present + if err and err:find("value should match only one schema", 1, true) then + if (conf.count or conf.time_window) and conf.rules then + return false, "count/time_window and rules cannot be specified at the same time" + end + end return false, err end + if conf.rules and (conf.count or conf.time_window) then + return false, "count/time_window and rules cannot be specified at the same time" + end + + if conf.group and conf.rules then + return false, "group and rules cannot be specified at the same time" + end + if conf.group then -- means that call by some plugin not support if conf._vid then @@ -214,12 +285,18 @@ function _M.check_schema(conf, schema_type) end end - local keys = {} - for _, rule in ipairs(conf.rules or {}) do - if keys[rule.key] then - return false, str_format("duplicate key '%s' in rules", rule.key) + if conf.policy == "redis" or conf.policy == "redis-cluster" or + conf.policy == "redis-sentinel" + then + if conf.sync_interval and conf.sync_interval ~= NO_DELAYED_SYNC then + if conf.sync_interval < 0.1 then + return false, "sync_interval should not be smaller than 0.1" + end + + if type(conf.time_window) == "number" and conf.sync_interval >= conf.time_window then + return false, "sync_interval should be smaller than time_window" + end end - keys[rule.key] = true end return true @@ -228,11 +305,14 @@ end local function create_limit_obj(conf, rule, plugin_name) core.log.info("create new ", plugin_name, " plugin instance", + ", policy: ", conf.policy, + ", window_type: ", conf.window_type, + ", sync_interval: ", conf.sync_interval, ", rule: ", core.json.delay_encode(rule, true)) if not conf.policy or conf.policy == "local" then return limit_local_new("plugin-" .. plugin_name, rule.count, - rule.time_window) + rule.time_window, conf.window_type) end if conf.policy == "redis" then @@ -244,6 +324,11 @@ local function create_limit_obj(conf, rule, plugin_name) rule.time_window, conf) end + if conf.policy == "redis-sentinel" then + return limit_redis_sentinel_new("plugin-" .. plugin_name, rule.count, + rule.time_window, conf) + end + return nil end @@ -259,7 +344,8 @@ local function gen_limit_key(conf, ctx, key) -- A route which reuses a previous route's ID will inherits its counter. local parent = conf._meta and conf._meta.parent if not parent or not parent.resource_key then - error("failed to generate key invalid parent: ", core.json.encode(parent)) + core.log.error("failed to generate key invalid parent: ", core.json.encode(parent)) + return nil end local new_key = parent.resource_key .. ':' .. apisix_plugin.conf_version(conf) @@ -276,10 +362,11 @@ end local function resolve_var(ctx, value) if type(value) == "string" then + local original_value = value local err, _ value, err, _ = core.utils.resolve_var(value, ctx.var) if err then - return nil, "could not resolve var for value: " .. value .. ", err: " .. err + return nil, "could not resolve var for value: " .. original_value .. ", err: " .. err end value = tonumber(value) if not value then @@ -339,17 +426,18 @@ end -local function construct_rate_limiting_headers(conf, name, rule, metadata) - local prefix = "X-" - if name == "ai-rate-limiting" then - prefix = "X-AI-" - end - +local function construct_rate_limiting_headers(conf, rule, metadata) if rule.header_prefix then + local base_limit = conf.limit_header or metadata.limit_header + local base_remaining = conf.remaining_header or metadata.remaining_header + local base_reset = conf.reset_header or metadata.reset_header + -- Insert rule prefix before "RateLimit-" to preserve any custom header base + -- e.g. "X-AI-RateLimit-Limit" + prefix "1" -> "X-AI-1-RateLimit-Limit" + local prefix = tostring(rule.header_prefix) return { - limit_header = prefix .. rule.header_prefix .. "-RateLimit-Limit", - remaining_header = prefix .. rule.header_prefix .. "-RateLimit-Remaining", - reset_header = prefix .. rule.header_prefix .. "-RateLimit-Reset", + limit_header = base_limit:gsub("RateLimit%-", prefix .. "-RateLimit-", 1), + remaining_header = base_remaining:gsub("RateLimit%-", prefix .. "-RateLimit-", 1), + reset_header = base_reset:gsub("RateLimit%-", prefix .. "-RateLimit-", 1), } end return { @@ -361,7 +449,22 @@ end local function run_rate_limit(conf, rule, ctx, name, cost, dry_run) - local lim, err = create_limit_obj(conf, rule, name) + local lim, err + if conf.group then + lim, err = group_limit_lru(conf.group, "", create_limit_obj, conf, conf, name) + elseif not conf.rules + and type(conf.count) == "number" + and type(conf.time_window) == "number" + then + local key = name .. "#" .. (conf.policy or "local") + if conf._vid then + key = key .. "#" .. conf._vid + end + lim, err = core.lrucache.plugin_ctx(lrucache, ctx, key, + create_limit_obj, conf, conf, name) + else + lim, err = create_limit_obj(conf, rule, name) + end if not lim then core.log.error("failed to fetch limit.count object: ", err) @@ -396,28 +499,50 @@ local function run_rate_limit(conf, rule, ctx, name, cost, dry_run) end key = gen_limit_key(conf, ctx, key) - core.log.info("limit key: ", key) + if not key then + return 500 + end + core.log.info("limit key: ", key, ", count: ", rule.count, + ", time_window: ", rule.time_window) + + local phase = get_phase() + local is_log_phase = phase == "log" + local commit_cost = dry_run and 0 or cost local delay, remaining, reset if not conf.policy or conf.policy == "local" then - if dry_run then - -- peek with cost=0 and commit=false to avoid side effects: - -- dict:get reads without creating or incrementing the key - delay, remaining, reset = lim:incoming(key, false, conf, 0) - if type(remaining) == "number" and remaining - cost < 0 then - delay = nil - remaining = "rejected" + delay, remaining, reset = lim:incoming(key, commit_cost) + else + local enable_delayed_sync = conf.sync_interval and (conf.sync_interval ~= NO_DELAYED_SYNC) + -- a dynamic time_window may resolve to a value <= sync_interval at request + -- time, which would break delayed-sync semantics; fall back to direct sync + if enable_delayed_sync and rule.time_window <= conf.sync_interval then + enable_delayed_sync = false + end + if is_log_phase then + lim:log_phase_incoming(key, commit_cost) + return + elseif enable_delayed_sync then + local extra_key = name .. '#' .. conf.policy + if conf._vid then + extra_key = extra_key .. '#' .. conf._vid end + local plugin_instance_id = core.lrucache.plugin_ctx_id(ctx, extra_key) + delay, remaining, reset = lim:incoming_delayed(key, commit_cost, plugin_instance_id) else - delay, remaining, reset = lim:incoming(key, true, conf, cost) + delay, remaining, reset = lim:incoming(key, commit_cost) end - else - delay, remaining, reset = lim:incoming(key, cost) end + if dry_run and type(remaining) == "number" and remaining - cost < 0 then + delay = nil + remaining = "rejected" + end + reset = reset and (math_floor(reset * 100) / 100) + core.utils.set_var_rate_limiting_info(ctx, key, lim.limit, remaining, reset) - local metadata = apisix_plugin.plugin_metadata("limit-count") + local metadata = apisix_plugin.plugin_metadata(name) if metadata then metadata = metadata.value else @@ -425,9 +550,8 @@ local function run_rate_limit(conf, rule, ctx, name, cost, dry_run) end core.log.info("limit-count plugin-metadata: ", core.json.delay_encode(metadata)) - local set_limit_headers = construct_rate_limiting_headers(conf, name, rule, metadata) - local phase = get_phase() - local set_header = phase ~= "log" + local set_limit_headers = construct_rate_limiting_headers(conf, rule, metadata) + local set_header = not is_log_phase if not delay then local err = remaining @@ -435,8 +559,8 @@ local function run_rate_limit(conf, rule, ctx, name, cost, dry_run) -- show count limit header when rejected if conf.show_limit_quota_header and set_header then core.response.set_header(set_limit_headers.limit_header, lim.limit, - set_limit_headers.remaining_header, 0, - set_limit_headers.reset_header, reset) + set_limit_headers.remaining_header, 0, + set_limit_headers.reset_header, reset) end if conf.rejected_msg then diff --git a/apisix/plugins/limit-count/limit-count-local.lua b/apisix/plugins/limit-count/limit-count-local.lua index 87d1059d7a17..d6d0eb7d6a9d 100644 --- a/apisix/plugins/limit-count/limit-count-local.lua +++ b/apisix/plugins/limit-count/limit-count-local.lua @@ -15,10 +15,13 @@ -- limitations under the License. -- local limit_count = require("resty.limit.count") +local sliding_window = require("apisix.plugins.limit-count.sliding-window.sliding-window") +local shared_dict_store = require("apisix.plugins.limit-count.sliding-window." + .. "store.shared-dict") local ngx = ngx local type = type -local ngx_time = ngx.time +local ngx_now = ngx.now local assert = assert local setmetatable = setmetatable local core = require("apisix.core") @@ -31,7 +34,7 @@ local mt = { local function set_endtime(self, key, time_window) -- set an end time - local end_time = ngx_time() + time_window + local end_time = ngx_now() + time_window -- save to dict by key local success, err = self.dict:set(key, end_time, time_window) @@ -46,28 +49,65 @@ end local function read_reset(self, key) -- read from dict local end_time = (self.dict:get(key) or 0) - local reset = end_time - ngx_time() + local reset = end_time - ngx_now() if reset < 0 then reset = 0 end return reset end -function _M.new(plugin_name, limit, window) +function _M.new(plugin_name, limit, window, window_type) assert(limit > 0 and window > 0) + if window_type == "sliding" then + local shd_store, err = shared_dict_store.new({name = plugin_name}) + if not shd_store then + return nil, err + end + + local sw_limit_count + sw_limit_count, err = sliding_window.new(shd_store, limit, window) + + if not sw_limit_count then + return nil, err + end + + local self = { + limit = limit, + window = window, + window_type = window_type, + limit_count = sw_limit_count, + } + + return setmetatable(self, mt) + end + local self = { - limit_count = limit_count.new(plugin_name, limit, window), - dict = ngx.shared[plugin_name .. "-reset-header"], limit = limit, window = window, + window_type = window_type, + limit_count = limit_count.new(plugin_name, limit, window), + dict = ngx.shared[plugin_name .. "-reset-header"] } return setmetatable(self, mt) end -function _M.incoming(self, key, commit, conf, cost) - local delay, consumed_or_err = self.limit_count:incoming(key, commit, cost) +function _M.incoming(self, key, flag_or_cost, _conf, cost_arg) + local cost + if type(flag_or_cost) == "boolean" then + -- old API: incoming(key, flag, conf, cost) + cost = cost_arg + else + -- new API: incoming(key, cost) + cost = flag_or_cost + end + + if self.window_type == "sliding" then + return self.limit_count:incoming(key, cost) + end + + local delay, consumed_or_err = self.limit_count:incoming(key, true, cost) local reset local remaining_or_err = consumed_or_err diff --git a/apisix/plugins/limit-count/limit-count-redis-cluster.lua b/apisix/plugins/limit-count/limit-count-redis-cluster.lua index 89a823e61f45..a2cd027e198c 100644 --- a/apisix/plugins/limit-count/limit-count-redis-cluster.lua +++ b/apisix/plugins/limit-count/limit-count-redis-cluster.lua @@ -17,73 +17,111 @@ local redis_cluster = require("apisix.utils.rediscluster") local core = require("apisix.core") -local to_hex = require("resty.string").to_hex +local delayed_syncer = require("apisix.plugins.limit-count.delayed-syncer") +local sliding_window = require("apisix.plugins.limit-count.sliding-window.sliding-window") +local sliding_window_store = require("apisix.plugins.limit-count." + .. "sliding-window.store.redis") +local limit_count_local = require("apisix.plugins.limit-count.limit-count-local") +local util = require("apisix.plugins.limit-count.util") + +local timer_at = ngx.timer.at local setmetatable = setmetatable -local tostring = tostring local _M = {} - local mt = { __index = _M } - -local script = core.string.compress_script([=[ - assert(tonumber(ARGV[3]) >= 1, "cost must be at least 1") - local ttl = redis.call('ttl', KEYS[1]) - if ttl < 0 then - redis.call('set', KEYS[1], ARGV[1] - ARGV[3], 'EX', ARGV[2]) - return {ARGV[1] - ARGV[3], ARGV[2]} - end - return {redis.call('incrby', KEYS[1], 0 - ARGV[3]), ttl} -]=]) -local script_sha = to_hex(ngx.sha1_bin(script)) - - function _M.new(plugin_name, limit, window, conf) local red_cli, err = redis_cluster.new(conf, "plugin-limit-count-redis-cluster-slot-lock") if not red_cli then return nil, err end + local fallback_limiter, fallback_err = limit_count_local.new(plugin_name, + limit, window, + conf.window_type) + if not fallback_limiter then + return nil, fallback_err + end + + local enable_delayed_sync = conf.sync_interval and conf.sync_interval ~= -1 + + if conf.window_type == "sliding" then + local sw_limit_count + sw_limit_count, err = sliding_window.new(sliding_window_store, limit, window, red_cli) + if not sw_limit_count then + return nil, err + end + + sw_limit_count.fallback_limiter = fallback_limiter + local self = { + limit = limit, + window_type = conf.window_type, + limit_count = sw_limit_count, + } + + if enable_delayed_sync then + local ds, ds_err = delayed_syncer.new(plugin_name, limit, window, + conf, self.limit_count) + if not ds then + return nil, ds_err + end + self.delayed_syncer = ds + end + return setmetatable(self, mt) + end + local self = { limit = limit, window = window, conf = conf, plugin_name = plugin_name, red_cli = red_cli, + fallback_limiter = fallback_limiter, } + if enable_delayed_sync then + local ds, ds_err = delayed_syncer.new(plugin_name, limit, window, conf, self) + if not ds then + return nil, ds_err + end + self.delayed_syncer = ds + end + return setmetatable(self, mt) end - -function _M.incoming(self, key, cost) - local red = self.red_cli - local limit = self.limit - local window = self.window - key = self.plugin_name .. tostring(key) - - local ttl = 0 - local res, err = red:evalsha(script_sha, 1, key, limit, window, cost or 1) - if err and core.string.has_prefix(err, "NOSCRIPT") then - core.log.warn("redis evalsha failed: ", err, ". Falling back to eval") - res, err = red:eval(script, 1, key, limit, window, cost or 1) +function _M.incoming_delayed(self, key, cost, syncer_id) + local remaining, reset, err = self.delayed_syncer:delayed_sync(key, cost, syncer_id) + if not remaining then + return nil, err, 0 + end + if remaining < 0 then + return nil, "rejected", reset end + return 0, remaining, reset +end - if err then - return nil, err, ttl +function _M.incoming(self, key, cost) + if self.window_type == "sliding" then + return self.limit_count:incoming(key, cost) end - local remaining = res[1] - ttl = res[2] + return util.redis_incoming(self, key, cost, false) +end - if remaining < 0 then - return nil, "rejected", ttl +function _M.log_phase_incoming(self, key, cost) + local ok, err = timer_at(0, function () + local delay, incoming_err = self:incoming(key, cost) + if not delay and incoming_err ~= "rejected" then + core.log.error("failed to sync limit count in log phase: ", incoming_err) + end + end) + if not ok then + core.log.error("failed to schedule timer: ", err) end - return 0, remaining, ttl end - return _M diff --git a/apisix/plugins/limit-count/limit-count-redis-sentinel.lua b/apisix/plugins/limit-count/limit-count-redis-sentinel.lua new file mode 100644 index 000000000000..ec533462a987 --- /dev/null +++ b/apisix/plugins/limit-count/limit-count-redis-sentinel.lua @@ -0,0 +1,128 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- +local delayed_syncer = require("apisix.plugins.limit-count.delayed-syncer") +local sliding_window = require("apisix.plugins.limit-count.sliding-window.sliding-window") +local sliding_window_store = require("apisix.plugins.limit-count." + .. "sliding-window.store.redis") +local limit_count_local = require("apisix.plugins.limit-count.limit-count-local") +local redis_cli_sentinel = require("apisix.plugins.limit-count.util").redis_cli_sentinel +local util = require("apisix.plugins.limit-count.util") +local timer_at = ngx.timer.at +local core = require("apisix.core") +local assert = assert +local setmetatable = setmetatable +local _M = {} + +local mt = { + __index = _M +} + + +function _M.new(plugin_name, limit, window, conf) + assert(limit > 0 and window > 0) + local fallback_limiter, err = limit_count_local.new(plugin_name, + limit, window, conf.window_type) + if not fallback_limiter then + return nil, err + end + + local enable_delayed_sync = conf.sync_interval and conf.sync_interval ~= -1 + + if conf.window_type == "sliding" then + local sw_limit_count, err = sliding_window.new_with_red_cli_factory(sliding_window_store, + limit, window, redis_cli_sentinel, conf) + if not sw_limit_count then + return nil, err + end + sw_limit_count.fallback_limiter = fallback_limiter + local self = { + limit = limit, + window_type = conf.window_type, + limit_count = sw_limit_count, + } + + if enable_delayed_sync then + local ds, ds_err = delayed_syncer.new(plugin_name, limit, window, + conf, self.limit_count) + if not ds then + return nil, ds_err + end + self.delayed_syncer = ds + end + return setmetatable(self, mt) + end + + local self = { + limit = limit, + window = window, + conf = conf, + plugin_name = plugin_name, + fallback_limiter = fallback_limiter, + } + if enable_delayed_sync then + local ds, ds_err = delayed_syncer.new(plugin_name, limit, window, conf, self) + if not ds then + return nil, ds_err + end + self.delayed_syncer = ds + end + return setmetatable(self, mt) +end + + +function _M.incoming_delayed(self, key, cost, syncer_id) + local remaining, reset, err = self.delayed_syncer:delayed_sync(key, cost, syncer_id) + if not remaining then + return nil, err, 0 + end + if remaining < 0 then + return nil, "rejected", reset + end + return 0, remaining, reset +end + + +function _M.incoming(self, key, cost) + if self.window_type == "sliding" then + return self.limit_count:incoming(key, cost) + end + + local conf = self.conf + local red, err = redis_cli_sentinel(conf) + if not red then + return nil, err, 0 + end + self.red_cli = red + return util.redis_incoming(self, key, cost, true) +end + + +function _M.log_phase_incoming(self, key, cost) + local ok, err = timer_at(0, function () + local delay, err = self:incoming(key, cost) + if not delay then + if err ~= "rejected" then + core.log.error("failed to sync limit count in log phase: ", err) + end + end + end) + if not ok then + core.log.error("failed to schedule timer: ", err) + end +end + +return _M diff --git a/apisix/plugins/limit-count/limit-count-redis.lua b/apisix/plugins/limit-count/limit-count-redis.lua index 5580b22a0668..bdabe7148325 100644 --- a/apisix/plugins/limit-count/limit-count-redis.lua +++ b/apisix/plugins/limit-count/limit-count-redis.lua @@ -14,82 +14,122 @@ -- See the License for the specific language governing permissions and -- limitations under the License. -- -local redis = require("apisix.utils.redis") local core = require("apisix.core") -local to_hex = require("resty.string").to_hex +local delayed_syncer = require("apisix.plugins.limit-count.delayed-syncer") +local sliding_window = require("apisix.plugins.limit-count.sliding-window.sliding-window") +local sliding_window_store = require("apisix.plugins.limit-count." + .. "sliding-window.store.redis") +local limit_count_local = require("apisix.plugins.limit-count.limit-count-local") +local util = require("apisix.plugins.limit-count.util") +local redis_cli = util.redis_cli + local assert = assert local setmetatable = setmetatable -local tostring = tostring - +local timer_at = ngx.timer.at local _M = {version = 0.3} - local mt = { __index = _M } +function _M.new(plugin_name, limit, window, conf) + assert(limit > 0 and window > 0) -local script = core.string.compress_script([=[ - assert(tonumber(ARGV[3]) >= 1, "cost must be at least 1") - local ttl = redis.call('ttl', KEYS[1]) - if ttl < 0 then - redis.call('set', KEYS[1], ARGV[1] - ARGV[3], 'EX', ARGV[2]) - return {ARGV[1] - ARGV[3], ARGV[2]} + local fallback_limiter, err = limit_count_local.new(plugin_name, + limit, window, conf.window_type) + if not fallback_limiter then + return nil, err end - return {redis.call('incrby', KEYS[1], 0 - ARGV[3]), ttl} -]=]) -local script_sha = to_hex(ngx.sha1_bin(script)) - -function _M.new(plugin_name, limit, window, conf) - assert(limit > 0 and window > 0) + local enable_delayed_sync = conf.sync_interval and conf.sync_interval ~= -1 + + if conf.window_type == "sliding" then + local sw_limit_count + sw_limit_count, err = sliding_window.new_with_red_cli_factory(sliding_window_store, + limit, window, + redis_cli, conf) + if not sw_limit_count then + return nil, err + end + + sw_limit_count.fallback_limiter = fallback_limiter + local self = { + limit = limit, + window_type = conf.window_type, + limit_count = sw_limit_count, + } + + if enable_delayed_sync then + local ds, ds_err = delayed_syncer.new(plugin_name, limit, window, + conf, self.limit_count) + if not ds then + return nil, ds_err + end + self.delayed_syncer = ds + end + return setmetatable(self, mt) + end local self = { limit = limit, window = window, conf = conf, plugin_name = plugin_name, + fallback_limiter = fallback_limiter, } + + if enable_delayed_sync then + local ds, ds_err = delayed_syncer.new(plugin_name, limit, window, conf, self) + if not ds then + return nil, ds_err + end + self.delayed_syncer = ds + end + return setmetatable(self, mt) end -function _M.incoming(self, key, cost) - local conf = self.conf - local red, err = redis.new(conf) - if not red then - return red, err, 0 +function _M.incoming_delayed(self, key, cost, syncer_id) + local remaining, reset, err = self.delayed_syncer:delayed_sync(key, cost, syncer_id) + if not remaining then + return nil, err, 0 + end + if remaining < 0 then + return nil, "rejected", reset end + return 0, remaining, reset +end - local limit = self.limit - local window = self.window - local res - key = self.plugin_name .. tostring(key) +function _M.incoming(self, key, cost) + if self.window_type == "sliding" then + return self.limit_count:incoming(key, cost) + end - local ttl = 0 - res, err = red:evalsha(script_sha, 1, key, limit, window, cost or 1) - if err and core.string.has_prefix(err, "NOSCRIPT") then - core.log.warn("redis evalsha failed: ", err, ". Falling back to eval") - res, err = red:eval(script, 1, key, limit, window, cost or 1) + local red, err = redis_cli(self.conf) + if not red then + return nil, err, 0 end - if err then - return nil, err, ttl + self.red_cli = red + local delay, remaining, ttl = util.redis_incoming(self, key, cost, true) + if not delay and remaining ~= "rejected" then + return nil, remaining, ttl end - local remaining = res[1] - ttl = res[2] + return delay, remaining, ttl +end - local ok, err = red:set_keepalive(conf.redis_keepalive_timeout, conf.redis_keepalive_pool) +function _M.log_phase_incoming(self, key, cost) + local ok, err = timer_at(0, function () + local delay, incoming_err = self:incoming(key, cost) + if not delay and incoming_err ~= "rejected" then + core.log.error("failed to sync limit count in log phase: ", incoming_err) + end + end) if not ok then - return nil, err, ttl - end - - if remaining < 0 then - return nil, "rejected", ttl + core.log.error("failed to schedule timer: ", err) end - return 0, remaining, ttl end - return _M diff --git a/apisix/plugins/limit-count/sliding-window/sliding-window.lua b/apisix/plugins/limit-count/sliding-window/sliding-window.lua new file mode 100644 index 000000000000..4f2e229f2ac3 --- /dev/null +++ b/apisix/plugins/limit-count/sliding-window/sliding-window.lua @@ -0,0 +1,199 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- +local tostring = tostring +local string_format = string.format +local math_floor = math.floor +local math_ceil = math.ceil + +local ngx_now = ngx.now +local setmetatable = setmetatable +local log = require("apisix.core.log") + +local _M = {} +local mt = { __index = _M } + +local function round_off_decimal_places(input, places) + local multiplier = 10 ^ places + return math_ceil(input * multiplier) / multiplier +end + + +-- uniquely identifies the window associated with given time +local function get_window_id(self, time) + return tostring(math_floor(time / self.window_size)) +end + + +local function get_counter_key(self, key, time) + local wid = get_window_id(self, time) + return string_format("%s.%s.counter", key, wid) +end + + +local function get_last_rate(self, sample, now_ms, red_cli) + local a_window_ago_from_now = now_ms - self.window_size + local last_counter_key = get_counter_key(self, sample, a_window_ago_from_now) + + local last_count, err = self.store:get(last_counter_key, red_cli) + if err then + return nil, err + end + if not last_count then + last_count = 0 + end + if last_count > self.limit then + -- in incoming we also reactively check for exceeding limit + -- after icnrementing the counter. So even though counter can be higher + -- than the limit as a result of racy behaviour we would still throttle + -- anyway. That is way it is important to correct the last count here + -- to avoid over-punishment. + last_count = self.limit + end + + return last_count / self.window_size +end + + +function _M.new(store, limit, window_size, red_cli) + if not store then + return nil, "'store' parameter is missing" + end + if not store.incr then + return nil, "'store' has to implement 'incr' function" + end + if not store.get then + return nil, "'store' has to implement 'get' function" + end + + return setmetatable({ + store = store, + limit = limit, + window_size = window_size, + red_cli = red_cli + }, mt) +end + + +function _M.new_with_red_cli_factory(store, limit, window_size, red_cli_factory, conf) + if not store then + return nil, "'store' parameter is missing" + end + if not store.incr then + return nil, "'store' has to implement 'incr' function" + end + if not store.get then + return nil, "'store' has to implement 'get' function" + end + + return setmetatable({ + store = store, + limit = limit, + window_size = window_size, + conf = conf, + red_cli_factory = red_cli_factory + }, mt) +end + + +local function get_desired_delay(self, remaining_time, last_rate, count) + if last_rate == 0 then + return remaining_time + end + + local desired_delay = remaining_time - (self.limit - count) / last_rate + + if desired_delay <= 0 then + return 0 + end + + return desired_delay +end + + +function _M.incoming(self, key, cost) + local now = ngx_now() + local counter_key = get_counter_key(self, key, now) + local remaining_time = self.window_size - now % self.window_size + + local red_cli, err + if not self.red_cli and self.red_cli_factory then + red_cli, err = self.red_cli_factory(self.conf) + if not red_cli then + return nil, err, 0 + end + end + + local count, err = self.store:get(counter_key, self.red_cli or red_cli) + if err then + return nil, err + end + if not count then + count = 0 + end + log.debug("count: ", count, ", limit: ", self.limit) + if count >= self.limit then + return nil, "rejected", round_off_decimal_places(remaining_time, 2) + end + + local last_rate + last_rate, err = get_last_rate(self, key, now, self.red_cli or red_cli) + if err then + return nil, err, 0 + end + + local estimated_last_window_count = last_rate * remaining_time + local estimated_final_count = estimated_last_window_count + count + log.debug("estimated_final_count: ", estimated_final_count, ", limit: ", self.limit) + if estimated_final_count >= self.limit then + local desired_delay = + get_desired_delay(self, remaining_time, last_rate, count) + return nil, "rejected", round_off_decimal_places(desired_delay, 2) + end + + local expiry = self.window_size * 2 + local new_count + new_count, err = self.store:incr(counter_key, cost, expiry, self.red_cli or red_cli) + if err then + return nil, err, 0 + end + + if red_cli then + red_cli:set_keepalive(10000, 100) + end + + -- The below limit checking is only to cope with a racy behaviour where + -- counter for the given sample is incremented at the same time by multiple + -- sliding_window instances. That is we re-adjust the new count by ignoring + -- the current occurrence of the sample. Otherwise the limit would + -- unncessarily be exceeding. + local new_adjusted_count = new_count - cost + log.debug("new_adjusted_count: ", new_adjusted_count, ", limit: ", self.limit) + + if new_adjusted_count >= self.limit then + -- incr above might take long enough to make difference, so + -- we recalculate time-dependant variables. + remaining_time = self.window_size - ngx_now() % self.window_size + return nil, "rejected", round_off_decimal_places(remaining_time, 2) + end + + local remaining = self.limit - new_count - estimated_last_window_count + local rounded_remaining = math_floor(remaining) + + return 0, rounded_remaining, round_off_decimal_places(remaining_time, 2) +end + +return _M diff --git a/apisix/plugins/limit-count/sliding-window/store/redis.lua b/apisix/plugins/limit-count/sliding-window/store/redis.lua new file mode 100644 index 000000000000..6ff77d6033ad --- /dev/null +++ b/apisix/plugins/limit-count/sliding-window/store/redis.lua @@ -0,0 +1,72 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- +local ngx_null = ngx.null +local tonumber = tonumber +local core = require("apisix.core") +local to_hex = require("resty.string").to_hex + +local _M = {} + + +local incr_script = core.string.compress_script([=[ + local ttl = redis.call('pttl', KEYS[1]) + if ttl < 0 then + redis.call('set', KEYS[1], ARGV[1], 'EX', ARGV[2]) + return tonumber(ARGV[1]) + end + return redis.call('incrby', KEYS[1], ARGV[1]) +]=]) +local incr_script_sha = to_hex(ngx.sha1_bin(incr_script)) + + +-- TODO: keepalive or close +function _M.incr(self, key, delta, expiry, red) + -- nk key1 argv1 argv2 + local new_value, err + new_value, err = red:evalsha(incr_script_sha, 1, key, delta, expiry) + if err and core.string.has_prefix(err, "NOSCRIPT") then + core.log.warn("redis evalsha failed: ", err, ". Falling back to eval") + new_value, err = red:eval(incr_script, 1, key, delta, expiry) + end + if err then + return nil, err + end + + if not new_value then + return nil, "malformed redis response while calling incr" + end + + return new_value +end + + +-- TODO: keepalive or close +function _M.get(self, key, red) + local value, err = red:get(key) + if not value or value == ngx_null then + return nil, err + end + + value = tonumber(value) + if not value then -- maybe warn log? + return nil, "redis counter is not a number the value could have been modified" + end + + return value, nil +end + +return _M diff --git a/apisix/plugins/limit-count/sliding-window/store/shared-dict.lua b/apisix/plugins/limit-count/sliding-window/store/shared-dict.lua new file mode 100644 index 000000000000..0401bf0b4b1e --- /dev/null +++ b/apisix/plugins/limit-count/sliding-window/store/shared-dict.lua @@ -0,0 +1,64 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- +local ngx = ngx +local log = require("apisix.core.log") +local string_format = string.format +local setmetatable = setmetatable + +local _M = {} +local mt = { __index = _M } + +function _M.new(options) + if not options.name then + return nil, "shared dictionary name is mandatory" + end + + local dict = ngx.shared[options.name] + if not dict then + return nil, + string_format("shared dictionary with name \"%s\" is not configured", + options.name) + end + + return setmetatable({ + dict = dict, + }, mt) +end + +function _M.incr(self, key, delta, expiry) + local new_value, err, forcible = self.dict:incr(key, delta, 0, expiry) + if err then + return nil, err + end + + if forcible then + log.warn("shared dictionary is full, removed valid key(s) to store the new one") + end + + return new_value +end + +function _M.get(self, key) + local value, err = self.dict:get(key) + if not value then + return nil, err + end + + return value +end + +return _M diff --git a/apisix/plugins/limit-count/util.lua b/apisix/plugins/limit-count/util.lua new file mode 100644 index 000000000000..e76c683c4d66 --- /dev/null +++ b/apisix/plugins/limit-count/util.lua @@ -0,0 +1,164 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- +local core = require("apisix.core") +local redis_new = require("resty.redis").new +local redis_sentinel = require("resty.redis.connector") +local to_hex = require("resty.string").to_hex +local _M = {} + +local tostring = tostring + +local commit_script = core.string.compress_script([=[ + assert(tonumber(ARGV[3]) >= 0, "cost must be at least 0") + local ttl = redis.call('pttl', KEYS[1]) + if ttl < 0 then + redis.call('set', KEYS[1], ARGV[3], 'EX', ARGV[2]) + return {ARGV[3], ARGV[2] * 1000} + end + return {redis.call('incrby', KEYS[1], ARGV[3]), ttl} +]=]) +local commit_script_sha = to_hex(ngx.sha1_bin(commit_script)) + + +function _M.redis_cli(conf) + local red = redis_new() + local timeout = conf.redis_timeout or 1000 -- 1sec + + red:set_timeouts(timeout, timeout, timeout) + + local sock_opts = { + ssl = conf.redis_ssl, + ssl_verify = conf.redis_ssl_verify + } + + local ok, err = red:connect(conf.redis_host, conf.redis_port or 6379, sock_opts) + if not ok then + return nil, err + end + + local count + count, err = red:get_reused_times() + if 0 == count then + if conf.redis_password and conf.redis_password ~= '' then + local ok, err + if conf.redis_username then + ok, err = red:auth(conf.redis_username, conf.redis_password) + else + ok, err = red:auth(conf.redis_password) + end + if not ok then + return nil, err + end + end + + -- select db + if (conf.redis_database or 0) ~= 0 then + local ok, err = red:select(conf.redis_database) + if not ok then + return nil, "failed to change redis db, err: " .. err + end + end + elseif err then + -- core.log.info(" err: ", err) + return nil, err + end + return red, nil +end + +function _M.redis_cli_sentinel(conf) + local redis_conf = { + username = conf.redis_username, + password = conf.redis_password, + sentinel_username = conf.sentinel_username, + sentinel_password = conf.sentinel_password, + db = conf.redis_database or 0, + sentinels = conf.redis_sentinels or {}, + master_name = conf.redis_master_name, + role = conf.redis_role or "master", + connect_timeout = conf.redis_connect_timeout or 1000, + read_timeout = conf.redis_read_timeout or 1000, + keepalive_timeout = conf.redis_keepalive_timeout or 60000, + } + + local sentinel_client, err = redis_sentinel.new(redis_conf) + if not sentinel_client then + return nil, "failed to create redis client: " .. (err or "unknown error") + end + + -- In case of errors, returns "nil, err, previous_errors" where err is + -- the last error received, and previous_errors is a table of the previous errors. + local red, err, previous_errors = sentinel_client:connect() + if not red then + local err = "redis connection failed, err: " .. (err or "unknown error") + if previous_errors and #previous_errors > 0 then + err = err .. ", previous_errors: " .. core.table.concat(previous_errors, ", ") + end + return nil, err + end + return red, nil +end + + +function _M.redis_incoming(self, key, cost, keepalive) + if self.window_type == "sliding" then + return self.limit_count:incoming(key, cost) + end + + local red = self.red_cli + if not red then + return nil, "redis client not initialized", 0 + end + + local limit = self.limit + local window = self.window + key = self.plugin_name .. tostring(key) + + core.log.info("syncing limit count to redis, key: ", key, + ", limit: ", limit, ", window: ", window, ", cost: ", cost) + + local res, err + res, err = red:evalsha(commit_script_sha, 1, key, limit, window, cost) + if err and core.string.has_prefix(err, "NOSCRIPT") then + core.log.warn("redis evalsha failed: ", err, ". Falling back to eval") + res, err = red:eval(commit_script, 1, key, limit, window, cost) + end + if err then + return nil, err, 0 + end + + local remaining = limit - res[1] + local ttl = res[2] / 1000.0 + + if keepalive then + local conf = self.conf or {} + local ok, err = red:set_keepalive(conf.redis_keepalive_timeout or 10000, + conf.redis_keepalive_pool or 100) + if not ok then + core.log.error("failed to set keepalive for redis: ", err) + end + end + + + if remaining < 0 then + return nil, "rejected", ttl + end + + return 0, remaining, ttl +end + + +return _M diff --git a/ci/pod/docker-compose.plugin.yml b/ci/pod/docker-compose.plugin.yml index 0cecf593faba..e40a096fad35 100644 --- a/ci/pod/docker-compose.plugin.yml +++ b/ci/pod/docker-compose.plugin.yml @@ -18,6 +18,81 @@ version: "3.8" services: + redis-master: + image: redis:latest + container_name: redis-master + hostname: redis-master + ports: + - "6479:6379" + volumes: + - ./data/master:/data + command: > + sh -c ' + echo "user master on >master-password ~* +@all" > /data/users.acl; + redis-server --appendonly yes --repl-diskless-load on-empty-db --protected-mode no --aclfile /data/users.acl + ' + networks: + - apisix_net + + redis-slave-1: + image: redis:latest + container_name: redis-slave-1 + hostname: redis-slave-1 + depends_on: + - redis-master + volumes: + - ./data/slave1:/data + command: [ + "redis-server", + "--appendonly", "yes", + "--replicaof", "redis-master", "6379", + "--repl-diskless-load", "on-empty-db", + "--protected-mode", "no" + ] + networks: + - apisix_net + + sentinel-1: + image: redis:latest + container_name: sentinel-1 + hostname: sentinel-1 + depends_on: + - redis-master + ports: + - "26379:26379" + command: > + sh -c 'echo "bind 0.0.0.0" > /etc/sentinel.conf; + echo "sentinel monitor mymaster redis-master 6379 2" >> /etc/sentinel.conf; + echo "sentinel resolve-hostnames yes" >> /etc/sentinel.conf; + echo "sentinel down-after-milliseconds mymaster 10000" >> /etc/sentinel.conf; + echo "sentinel failover-timeout mymaster 10000" >> /etc/sentinel.conf; + echo "sentinel parallel-syncs mymaster 1" >> /etc/sentinel.conf; + redis-sentinel /etc/sentinel.conf' + networks: + - apisix_net + + sentinel-2: + image: redis:7.0 + container_name: sentinel-2 + hostname: sentinel-2 + depends_on: + - redis-master + ports: + - "26380:26379" + command: > + sh -c 'echo "bind 0.0.0.0" > /etc/sentinel.conf; + echo "sentinel monitor mymaster redis-master 6379 2" >> /etc/sentinel.conf; + echo "sentinel resolve-hostnames yes" >> /etc/sentinel.conf; + echo "sentinel down-after-milliseconds mymaster 10000" >> /etc/sentinel.conf; + echo "sentinel failover-timeout mymaster 10000" >> /etc/sentinel.conf; + echo "sentinel parallel-syncs mymaster 1" >> /etc/sentinel.conf; + echo "aclfile /etc/sentinel.acl" >> /etc/sentinel.conf; + echo "user admin on >admin-password +@all" > /etc/sentinel.acl; + echo "user default off" >> /etc/sentinel.acl; + redis-sentinel /etc/sentinel.conf' + networks: + - apisix_net + ## keycloak apisix_keycloak: container_name: apisix_keycloak diff --git a/conf/config.yaml.example b/conf/config.yaml.example index e38435e646d7..49f99df18c94 100644 --- a/conf/config.yaml.example +++ b/conf/config.yaml.example @@ -298,6 +298,7 @@ nginx_config: # Config for render the template to generate n balancer-ewma-locks: 10m balancer-ewma-last-touched-at: 10m plugin-limit-req-redis-cluster-slot-lock: 1m + plugin-limit-count-lock: 10m plugin-limit-count-redis-cluster-slot-lock: 1m plugin-limit-conn-redis-cluster-slot-lock: 1m tracing_buffer: 10m diff --git a/docs/en/latest/plugins/limit-count.md b/docs/en/latest/plugins/limit-count.md index 4aef33afae20..52d521ea4b97 100644 --- a/docs/en/latest/plugins/limit-count.md +++ b/docs/en/latest/plugins/limit-count.md @@ -4,7 +4,7 @@ keywords: - Apache APISIX - API Gateway - Limit Count -description: The limit-count plugin uses a fixed window algorithm to limit the rate of requests by the number of requests within a given time interval. Requests exceeding the configured quota will be rejected. +description: The limit-count plugin uses fixed or sliding window algorithms to limit the rate of requests by the number of requests within a given time interval. Requests exceeding the configured quota will be rejected. ---