[목차]
2. Token Bucket, Leaky Bucket 알고리즘
3. Fixed Window, Sliding Window Logging, Sliding Window Counter 알고리즘
지난 포스팅에선, Rate Limit이란 무엇이고 설계시 주의사항, 특히 분산 환경에서의 Race Condition 처리에 대해 알아보았다.
이번 포스팅에선, Rate Limit을 구현할 수 있는 알고리즘 중 Bucket 기반의 알고리즘들에 대해 살펴보려고 한다.
이번 포스팅을 포함한 Rate Limit에 관련된 모든 코드는 아래 깃허브에서 확인할 수 있다.
GitHub - mag123c/rate-limiter
Contribute to mag123c/rate-limiter development by creating an account on GitHub.
github.com
Token Bucket
토큰 버킷 알고리즘은 단순한데, 지정된 용량만큼의 토큰을 가지고 있고, 사전 설정된 양의 토큰이 주기적으로 채워진다.
토큰이 꽉 찬 버킷에는 더이상 토큰이 추가되지 않으며, 버킷에 잔여 토큰이 없을 경우 요청은 버려진다.
만들어보기
바로 코드로 알아보자.
interface TokenBucketConfig {
capacity: number; // 버킷 최대 토큰 개수
refillRate: number; // 초당 재충전 개수
consumePerRequest: number; // 요청 당 소모량
}
버킷의 최대 토큰 개수와, 초당 재충전 개수를 설정한다.
요청당 소모량도 커스터마이징 할 수 있지만 거의 모든 상황에서 1로 사용되지 않을까 싶다.
interface TokenBucket {
token: number;
lastRefillTime: number; // 마지막 요청 = 충전 시간
}
버킷 구현체 정의를 위한 인터페이스이다. lastRequestTime 처럼 마지막 요청 시간값이 필요한데, 이는 요청 시 RateLimiter에서 현재 시간과 비교하여 얼마나 재충전해줄지 계산하기 위해서이다.
이런 구현 방식은 요청이 들어올 때만 충전이 일어나므로, 일정 시간 동안 요청이 없으면 충천이 지연될 수 있다.
이는 Token Bucket 방식의 자연스러운 특징이며, 평시 부하를 줄이는 효과가 있다.
interface RateLimiter {
tryConsume(key: string): void;
}
class TokenBucketRateLimiter implements RateLimiter {
private buckets: Map<string, TokenBucket> = new Map();
constructor(private config: TokenBucketConfig) {}
tryConsume(key: string): void {
if (!this.hasEnoughTokens(key)) {
throw new Error(`Rate Limit Exceeded for key: ${key}`);
}
this.consumeTokens(key);
}
private hasEnoughTokens(key: string): boolean {
let bucket = this.buckets.get(key);
if (!bucket) {
this.createBucket(key);
bucket = this.buckets.get(key)!;
}
this.refillTokens(bucket);
return bucket.token >= this.config.consumePerRequest;
}
private consumeTokens(key: string) {
const bucket = this.buckets.get(key)!;
bucket.token -= this.config.consumePerRequest;
bucket.lastRefillTime = Date.now();
}
private refillTokens(bucket: TokenBucket) {
const now = Date.now();
const elapsedMs = now - bucket.lastRefillTime;
const elapsedSeconds = elapsedMs / 1000;
const tokensToAdd = elapsedSeconds * this.config.refillRate;
bucket.token = Math.min(bucket.token + tokensToAdd, this.config.capacity);
bucket.lastRefillTime = now;
}
private createBucket(key: string) {
return this.buckets.set(key, {
token: this.config.capacity,
lastRefillTime: 0,
});
}
}
위 인터페이스들을 가지고 TokenBucketRateLimiter을 구현해보았다.
설명이 단순한 것 처럼 코드도 단순한데, 버킷은 요청 키값 : 토큰 개수와 마지막 요청 시간값 객체의 단순한 형태로 구성되어있다.
RateLimiter에서 요청이 들어올 때 마다 잔여 토큰을 확인하고 요청을 처리하거나 버린다. 매 요청마다 정확한 토큰 수를 계산하기 위해 잔여 토큰 확인 시점에 경과 시간만큼 토큰을 충전하는 방식을 사용한다.
필요하다면, 오래된 key를 정리하는 LRU나 TTL 기반의 Map을 사용하여 메모리를 더 최적화할 수 있다.
토큰 버킷 알고리즘의 특징과 구현 코드에서 알 수 있듯이, 명확하게 장점이 보이는데,
- 구현이 쉽다.
- 메모리 사용을 적게 한다. (키의 복잡성은 얼마든지 줄일 수 있다. 해싱, 키 압축 등을 통해서)
- 짧은 시간에 집중되는 트래픽 처리가 가능하다. (토큰이 버킷에 남아있기만 하다면, 버킷 수만큼의 burst가 가능)
하지만, 최적의 값을 찾는 것이 어렵고 튜닝이 어렵다는 점이 단점이 있다.
A/B 테스트를 통해 최적의 임계치를 찾아야하고, 엄격하게 제한할 경우 정상 사용자도 금방 차단될 수 있으며, 느슨하게 제한할 경우 서버 과부하의 위험이 발생할 것이다.
통합 테스트
통합 테스트도 구성해보았다.
실제 사용 환경을 테스트해보기 위해 express로 임시 서버와 rateLimiter 미들웨어를 구축해서 테스트를 진행했다.
(express 서버, 미들웨어 등의 전체 코드는 깃허브에서 확인할 수 있다.)
import request from "supertest";
import { createApp } from "../../app";
import { createTokenBucketMiddleware } from "../middleware";
describe("Token Bucket Rate Limiter Integration", () => {
it("토큰 사용량만큼 요청을 처리한다", async () => {
const rateLimiter = createTokenBucketMiddleware({
capacity: 3,
refillRate: 1,
consumePerRequest: 1,
});
const app = createApp({
middlewares: [rateLimiter],
});
// 용량만큼 요청 성공
for (let i = 0; i < 3; i++) {
const response = await request(app).get("/");
expect(response.status).toBe(200);
}
// 토큰 소진 시 429
const response = await request(app).get("/");
expect(response.status).toBe(429);
expect(response.body.error).toBe("Too Many Requests");
});
it("다른 IP 주소는 독립적인 토큰 버킷을 가진다", async () => {
const rateLimiter = createTokenBucketMiddleware({
capacity: 2,
refillRate: 0.1,
consumePerRequest: 1,
});
const app = createApp({
middlewares: [rateLimiter],
});
// IP 1.1.1.1로 2번 요청
await request(app).get("/").set("X-Forwarded-For", "1.1.1.1");
await request(app).get("/").set("X-Forwarded-For", "1.1.1.1");
// IP 2.2.2.2로 2번 요청
await request(app).get("/").set("X-Forwarded-For", "2.2.2.2");
await request(app).get("/").set("X-Forwarded-For", "2.2.2.2");
// 각 IP별로 토큰 소진 확인
const response1 = await request(app)
.get("/")
.set("X-Forwarded-For", "1.1.1.1");
const response2 = await request(app)
.get("/")
.set("X-Forwarded-For", "2.2.2.2");
expect(response1.status).toBe(429);
expect(response2.status).toBe(429);
});
it("토큰이 재충전되면 다시 요청을 보낼 수 있다", async () => {
const rateLimiter = createTokenBucketMiddleware({
capacity: 1,
refillRate: 10, // 초당 10개 충전
consumePerRequest: 1,
});
const app = createApp({
middlewares: [rateLimiter],
});
await request(app).get("/");
// 즉시 요청하면 실패
let response = await request(app).get("/");
expect(response.status).toBe(429);
// 100ms 후 1개 토큰 충전됨
await new Promise((resolve) => setTimeout(resolve, 100));
response = await request(app).get("/");
expect(response.status).toBe(200);
});
it("동시 요청 처리 시 정확한 토큰 소비", async () => {
const capacity = 50;
const rateLimiter = createTokenBucketMiddleware({
capacity,
refillRate: 0,
consumePerRequest: 1,
});
const app = createApp({ middlewares: [rateLimiter] });
// 50개 동시 요청
const promises = Array(capacity)
.fill(null)
.map(() => request(app).get("/"));
const responses = await Promise.all(promises);
const successCount = responses.filter((r) => r.status === 200).length;
const failCount = responses.filter((r) => r.status === 429).length;
expect(successCount).toBe(capacity);
expect(failCount).toBe(0);
// 추가 요청은 실패
const extraResponse = await request(app).get("/");
expect(extraResponse.status).toBe(429);
});
it("커스텀 키 생성기 사용 시 올바르게 동작", async () => {
const rateLimiter = createTokenBucketMiddleware(
{
capacity: 2,
refillRate: 0,
consumePerRequest: 1,
},
{
keyGenerator: (req) => req.headers["api-key"]?.toString() || "anonymous",
}
);
const app = createApp({ middlewares: [rateLimiter] });
// API 키별로 독립적인 버킷
await request(app).get("/").set("api-key", "key1");
await request(app).get("/").set("api-key", "key1");
await request(app).get("/").set("api-key", "key2");
await request(app).get("/").set("api-key", "key2");
// 각 키별로 토큰 소진 확인
const response1 = await request(app).get("/").set("api-key", "key1");
const response2 = await request(app).get("/").set("api-key", "key2");
expect(response1.status).toBe(429);
expect(response2.status).toBe(429);
// anonymous 키는 별도 버킷
const anonymousResponse = await request(app).get("/");
expect(anonymousResponse.status).toBe(200);
});
it("burst traffic 처리 가능 (Token Bucket의 장점)", async () => {
const rateLimiter = createTokenBucketMiddleware({
capacity: 100,
refillRate: 10, // 초당 10개
consumePerRequest: 1,
});
const app = createApp({ middlewares: [rateLimiter] });
// 한 번에 100개 요청 (burst) 가능
const promises = Array(100)
.fill(null)
.map(() => request(app).get("/"));
const responses = await Promise.all(promises);
const allSuccess = responses.every((r) => r.status === 200);
expect(allSuccess).toBe(true);
// 토큰 소진 후 즉시 추가 요청은 실패
const response = await request(app).get("/");
expect(response.status).toBe(429);
});
it("consumePerRequest 옵션이 올바르게 동작", async () => {
const rateLimiter = createTokenBucketMiddleware({
capacity: 10,
refillRate: 0,
consumePerRequest: 5, // 요청당 5개 토큰 소비
});
const app = createApp({ middlewares: [rateLimiter] });
// 2번만 요청 가능 (10 / 5 = 2)
const response1 = await request(app).get("/");
const response2 = await request(app).get("/");
const response3 = await request(app).get("/");
expect(response1.status).toBe(200);
expect(response2.status).toBe(200);
expect(response3.status).toBe(429);
});
it("skip 옵션 사용 시 특정 요청은 토큰 소비하지 않음", async () => {
const rateLimiter = createTokenBucketMiddleware(
{
capacity: 2,
refillRate: 0,
consumePerRequest: 1,
},
{
skip: (req) => req.path === "/health",
}
);
const app = createApp({
middlewares: [rateLimiter],
setupRoutes: (app) => {
app.get("/health", (_req, res) => res.json({ status: "ok" }));
},
});
// health check는 토큰 소비 안함
for (let i = 0; i < 10; i++) {
const response = await request(app).get("/health");
expect(response.status).toBe(200);
}
// 일반 요청은 토큰 소비
await request(app).get("/");
await request(app).get("/");
const response = await request(app).get("/");
expect(response.status).toBe(429);
});
});
Leaky Bucket
Leaky Bucket 알고리즘은, 요청 처리율이 고정되어있는 비동기 방식으로 큐를 사용한다.
큐에 작업을 넣어놓고 버킷에 설정된 처리율 만큼 주기 별로 처리하는데, 큐의 크기를 제한해서 사용하고 고정된 처리율을 갖고 있기 때문에 안정적인 처리가 가능하다. 말도 안되게 큐의 사이즈를 늘려놓지 않는 이상, 적절하게 튜닝된 큐의 작업들을 안정적으로 처리할 수 있다.
얼핏 들어보면 메시지 큐와 유사한데, 실제로도 그렇다.
실시간성이 부족하고 단시간에 많은 트래픽이 몰리는 경우에 적합하지 않다. 큐 사이즈를 초과하는 요청은 전부 버려지기 때문이다.
만들어보기
장단점을 살펴봤으니, 마찬가지로 코드를 짜봤다.
interface LeakyBucketConfig {
capacity: number; // 큐의 최대 크기
leakRate: number; // 초당 처리 개수
}
앞서 말한 것 처럼, 큐의 최대 크기와 초당 처리 개수를 지정한다.
interface AsyncRateLimiter {
enqueue(key: string, callback: () => void): Promise<void>;
}
class LeakyBucketRateLimiter implements AsyncRateLimiter {
private buckets: Map<string, Queue<() => void>> = new Map();
private intervals: Map<string, NodeJS.Timeout> = new Map();
constructor(private config: LeakyBucketConfig) {}
async enqueue(key: string, callback: () => void): Promise<void> {
let queue = this.buckets.get(key);
if (!queue) {
queue = new Queue<() => void>();
this.buckets.set(key, queue);
this.startProcessing(key);
}
if (queue.size() >= this.config.capacity) {
throw new Error("Rate Limit Exceed");
}
queue.add(callback);
}
private startProcessing(key: string) {
const interval = setInterval(() => {
const queue = this.buckets.get(key);
if (queue && !queue.isEmpty()) {
const callback = queue.poll();
callback?.();
}
}, 1000 / this.config.leakRate);
this.intervals.set(key, interval);
}
cleanup() {
this.intervals.forEach(interval => clearInterval(interval));
this.intervals.clear();
this.buckets.clear();
}
}
비동기로 동작하게끔 구성해보았다. 여기서는 간단하게 구현해보기 위해서 setInterval을 사용했는데 실제로는 컨텍스트가 완전 분리되어 동작하지 않을까 싶다.
통합 테스트
Token Bucket과 마찬가지로 통합 테스트를 작성해보았는데, 전체 코드나 다른 설정 코드들은 깃허브를 참조하길 바란다. (꾸벅)
import request from "supertest";
import { createApp } from "../../app";
import { createLeakyBucketMiddleware } from "../middleware";
describe("Leaky Bucket Rate Limiter Integration", () => {
let rateLimiter: any;
afterEach(() => {
// cleanup을 호출하여 interval 정리
if (rateLimiter && rateLimiter.limiter && rateLimiter.limiter.cleanup) {
rateLimiter.limiter.cleanup();
}
jest.clearAllTimers();
});
it("큐 용량 내에서는 요청이 대기 후 처리됨", async () => {
const rateLimiter = createLeakyBucketMiddleware({
capacity: 3,
leakRate: 10,
});
const app = createApp({
middlewares: [rateLimiter],
});
const promises = [];
for (let i = 0; i < 3; i++) {
promises.push(request(app).get("/"));
}
const responses = await Promise.all(promises);
responses.forEach((response) => {
expect(response.status).toBe(200);
});
});
it("큐 용량을 초과하면 즉시 429 반환", async () => {
const rateLimiter = createLeakyBucketMiddleware({
capacity: 2,
leakRate: 1,
});
const app = createApp({
middlewares: [rateLimiter],
});
const promises = [];
for (let i = 0; i < 3; i++) {
promises.push(
request(app)
.get("/")
.then((res) => res)
.catch((err) => err.response)
);
}
const responses = await Promise.all(promises);
// 3번째는 큐가 가득 차서 즉시 429
const statusCodes = responses.map((r) => r.status).sort();
expect(statusCodes).toEqual([200, 200, 429]);
});
it("다른 IP 주소는 독립적인 큐를 가진다", async () => {
const rateLimiter = createLeakyBucketMiddleware({
capacity: 2, // 각 IP당 큐 용량 2
leakRate: 10,
});
const app = createApp({
middlewares: [rateLimiter],
});
// 같은 IP(1.1.1.1)에서 3번 요청
const ip1Requests = [];
for (let i = 0; i < 3; i++) {
ip1Requests.push(
request(app)
.get("/")
.set("X-Forwarded-For", "1.1.1.1")
.then((res) => res)
.catch((err) => err.response)
);
}
// 다른 IP(2.2.2.2)에서 2번 요청
const ip2Requests = [];
for (let i = 0; i < 2; i++) {
ip2Requests.push(
request(app)
.get("/")
.set("X-Forwarded-For", "2.2.2.2")
);
}
const [ip1Results, ip2Results] = await Promise.all([
Promise.all(ip1Requests),
Promise.all(ip2Requests),
]);
// IP 1.1.1.1: 2개는 성공, 1개는 429
const ip1StatusCodes = ip1Results.map((r) => r.status).sort();
expect(ip1StatusCodes).toEqual([200, 200, 429]);
// IP 2.2.2.2: 모두 성공
ip2Results.forEach((result) => {
expect(result.status).toBe(200);
});
});
it("처리 속도에 따라 순차적으로 처리됨", async () => {
const processedTimes: number[] = [];
const rateLimiter = createLeakyBucketMiddleware({
capacity: 10,
leakRate: 5, // 초당 5개 처리 (200ms마다 1개)
});
const app = createApp({
middlewares: [rateLimiter],
setupRoutes: (app) => {
app.get("/track", (_req, res) => {
processedTimes.push(Date.now());
res.json({ processed: true });
});
},
});
const promises = [];
for (let i = 0; i < 3; i++) {
await new Promise((resolve) => setTimeout(resolve, 10));
promises.push(request(app).get("/track"));
}
await Promise.all(promises);
// 처리 간격 확인
expect(processedTimes).toHaveLength(3);
if (processedTimes.length >= 2) {
const gap1 = processedTimes[1] - processedTimes[0];
expect(gap1).toBeGreaterThanOrEqual(150);
expect(gap1).toBeLessThan(250);
}
if (processedTimes.length >= 3) {
const gap2 = processedTimes[2] - processedTimes[1];
expect(gap2).toBeGreaterThanOrEqual(150);
expect(gap2).toBeLessThan(250);
}
});
it("커스텀 키 생성기 사용 시 올바르게 동작", async () => {
const rateLimiter = createLeakyBucketMiddleware(
{
capacity: 1,
leakRate: 10,
},
{
keyGenerator: (req) =>
req.headers["api-key"]?.toString() || "anonymous",
}
);
const app = createApp({
middlewares: [rateLimiter],
});
// 각 API 키로 2번씩 요청
const key1Requests = [];
for (let i = 0; i < 2; i++) {
key1Requests.push(
request(app)
.get("/")
.set("api-key", "key1")
.then((res) => res)
.catch((err) => err.response)
);
}
const key2Request = request(app).get("/").set("api-key", "key2");
const [key1Results, key2Result] = await Promise.all([
Promise.all(key1Requests),
key2Request,
]);
// key1: 첫 번째는 성공, 두 번째는 429
expect(key1Results[0].status).toBe(200);
expect(key1Results[1].status).toBe(429);
// key2: 성공
expect(key2Result.status).toBe(200);
});
it("skip 옵션 사용 시 특정 요청은 큐에 추가되지 않음", async () => {
const rateLimiter = createLeakyBucketMiddleware(
{
capacity: 2,
leakRate: 1,
},
{
skip: (req) => req.path === "/health",
}
);
const app = createApp({
middlewares: [rateLimiter],
setupRoutes: (app) => {
app.get("/health", (_req, res) => res.json({ status: "ok" }));
},
});
// health check는 큐에 추가 안됨
for (let i = 0; i < 10; i++) {
const response = await request(app).get("/health");
expect(response.status).toBe(200);
}
// 일반 요청은 큐에 추가 (3개 요청, 2개만 큐에 들어감)
const normalRequests = [];
for (let i = 0; i < 3; i++) {
normalRequests.push(
request(app)
.get("/")
.then((res) => res)
.catch((err) => err.response)
);
}
const results = await Promise.all(normalRequests);
const statusCodes = results.map((r) => r.status).sort();
expect(statusCodes).toEqual([200, 200, 429]);
});
it("onLimitReached 콜백이 호출된다", async () => {
let callbackCalled = false;
let limitedPath = "";
const rateLimiter = createLeakyBucketMiddleware(
{
capacity: 1,
leakRate: 100, // 빠른 처리
},
{
onLimitReached: (req, res) => {
callbackCalled = true;
limitedPath = req.path;
res.status(429).json({ error: "Queue is full" });
},
}
);
const app = createApp({ middlewares: [rateLimiter] });
// 두 요청을 거의 동시에 보내서 두 번째가 거부되도록
const promises = [];
for (let i = 0; i < 2; i++) {
promises.push(
request(app)
.get("/api/test")
.then((res) => res)
.catch((err) => err.response)
);
}
const results = await Promise.all(promises);
expect(callbackCalled).toBe(true);
expect(limitedPath).toBe("/api/test");
// 하나는 200, 하나는 429
const statusCodes = results.map(r => r.status).sort();
expect(statusCodes).toEqual([200, 429]);
const rejectedResponse = results.find(r => r.status === 429);
expect(rejectedResponse.body.error).toBe("Queue is full");
}, 10000);
it("다양한 HTTP 메서드에 대해 동일하게 작동", async () => {
const rateLimiter = createLeakyBucketMiddleware({
capacity: 5,
leakRate: 100, // 빠른 처리
});
const app = createApp({ middlewares: [rateLimiter] });
// 6개 요청 (다양한 메서드 5개 + 추가 1개)
const promises = [
request(app).get("/"),
request(app).post("/"),
request(app).put("/"),
request(app).delete("/"),
request(app).patch("/"),
request(app).get("/"), // 6번째 요청
].map(p => p.then(res => res).catch(err => err.response));
const responses = await Promise.all(promises);
const statusCodes = responses.map((r: any) => r.status).sort();
// 5개는 큐에 들어가고, 1개는 429
expect(statusCodes).toEqual([200, 200, 200, 200, 200, 429]);
}, 10000);
it("일정한 속도로 요청 처리 (Leaky Bucket의 특징)", async () => {
const processedTimes: number[] = [];
const leakRate = 2; // 초당 2개 처리
const rateLimiter = createLeakyBucketMiddleware({
capacity: 10,
leakRate,
});
const app = createApp({
middlewares: [rateLimiter],
setupRoutes: (app) => {
app.get("/steady", (_req, res) => {
processedTimes.push(Date.now());
res.json({ processed: true });
});
},
});
// 5개 요청을 한 번에 보냄
const promises = Array(5)
.fill(null)
.map(() => request(app).get("/steady"));
await Promise.all(promises);
// 처리 시간 간격 확인 (약 500ms 간격)
expect(processedTimes).toHaveLength(5);
for (let i = 1; i < processedTimes.length; i++) {
const gap = processedTimes[i] - processedTimes[i - 1];
expect(gap).toBeGreaterThanOrEqual(400); // 여유 있게 400ms
expect(gap).toBeLessThan(600);
}
});
});
정리
특징 | Tocken Bucket | Leaky Bucket |
Burst 처리 | 가능 (idle 후 burst 가능) | 불가능 (고정 속도 처리) |
처리 속도 | 가변 (토큰 수만큼) | 고정 (leakRate 기준) |
트래픽 완화 방식 | 충전된 토큰 기반 소비 | 큐에 요청 저장 후 일정 속도 처리 |
적합한 상황 | 단시간 요청 집중 허용, 유저 행동 기반 | 일정 처리량 유지, 백오피스 안정화 등 |
Token Bucket과 Leaky Bucket 알고리즘에 대해 다뤘다.
두 방식은 각각 특성이 다르기 때문에 API의 특성, 사용자 행동 패턴이나 인프라 구조에 따라 적절하게 선택해야 한다.
다음 글에서는 Fixed Window, Sliding Window 알고리즘에 대해 살펴보고, NestJS의 Throttler은 어떤 알고리즘을 채택하여 어떻게 구현되어 있는지 한 번 살펴보려고 한다.
References.
알렉스 쉬 - 가상면접 사례로 배우는 대규모 시스템 설계 기초[1]
https://www.geeksforgeeks.org/system-design/rate-limiting-in-system-design/