我正在使用多个连接同时访问HTTP服务器。我想对客户端进行节流,以响应服务器指示请求进入得太快。我不想改变我正在使用的HTTP库,而是想扩展它。
为此,如何实现具有以下约束的ThreadPoolExecutor?
- 遗嘱执行人有一个可选的利率限制。
- 当速率限制被禁用时,它会尽可能快地执行任务(
ThreadPoolExecutor
的正常行为)。 - 启用速率限制后,每秒最多可执行
N
个任务。 - 速率限制适用于所有执行器线程,而不是每个线程。
- 不允许爆发。也就是说,如果限制是每秒10个请求,我希望每100ms开始一个请求。我不希望所有的线程同时启动,然后在第二个剩余的时间里保持空闲。
- 速率限制是动态的。如果请求失败,则速率降低。如果请求成功,则速率增加。
- 当没有任务准备好执行时,线程被认为是空闲的(考虑到速率限制)。这意味着,我希望
ThreadPoolExecutor
标记这些线程是空闲的,并在它认为合适的时候将它们降速,而不是阻塞线程,直到达到速率限制。另一方面,一旦到了执行下一个任务的时间,线程应该再次加速。
What I've Looked Into
- ScheduledThreadPoolExecutor和DelayedQueue假设执行延迟在事件排队时间是已知的,而在我的例子中,在任务排队的时间和它被执行的时间之间速率可能会改变。
- RateLimiter可能是答案的一部分,但它本身还不够。
2条答案
按热度按时间m528fe3b1#
回答我自己的问题:
ScheduledThreadPoolExecutor
也至少保留一个线程,等待队列返回新任务。ThreadPoolExecutor
位于BlockingQueue
之上。当没有剩余任务时,它将阻塞BlockingQueue.take()
1.利率限制器。
1.一个
BlockingQueue
,隐藏元素,直到速率限制器允许使用它们。1.位于
BlockingQueue
顶部的ThreadPoolExecutor
。限速器
我提供了自己的基于Token Bucket algorithm算法的速率限制器,以克服
RateLimiter
的limitations。源代码为here。BlockingQueue
我实现了一个
BlockingDeque
(它扩展了BlockingQueue
),因为将来我想尝试将失败的任务推回到队列的前面。RateLimitedBlockingDeque.java
q35jwt9p2#
用途:
输出:
实施: