我需要将数据发送到外部api,但此api对每个端点的请求数有限制(即:每分钟60个请求)。
数据来自Kafka,然后每条消息都发送到redis(因为我可以发送一个包含200个项目的请求)。所以,我使用一个简单的缓存来帮助我,我可以保证如果我的服务器宕机,我不会丢失任何消息。
问题是,有时Kafka开始发送许多消息,然后redis开始增长(要发送到api的消息超过100万条),我们不能在消息进来时发出太快的请求。然后,我们有一个很大的延迟。
我的第一个代码很简单: ExecutorService executor = Executors.newFixedThreadPool(1);
当消息很少且延迟最小时,这种方法非常有效。
所以,我做的第一件事就是把遗嘱执行人改成: ExecutorService executor = Executors.newCachedThreadPool();
因此,我可以要求新的线程,因为我需要使对外部api的请求更快,但是,我有一个问题,即每分钟请求的限制。
有一些端点,我可以每分钟发出300个请求,其他的是500个,其他的是30个,以此类推。
我做的代码不是很好,这是为了我工作的公司,所以,我真的需要把它做得更好。
所以,每次我要请求外部api时,我都会调用makerequest方法,这个方法是同步的,我知道我可以使用一个同步的列表,但是我认为在这种情况下,一个同步的方法更好。
// This is an inner class
private static class IntegrationType {
final Queue<Long> requests; // This queue is used to store the timestamp of the requests
final int maxRequestsPerMinute; // How many requests I can make per minute
public IntegrationType(final int maxRequestsPerMinute) {
this.maxRequestsPerMinute = maxRequestsPerMinute;
this.requests = new LinkedList<>();
}
synchronized void makeRequest() {
final long current = System.currentTimeMillis();
requests.add(current);
if (requests.size() >= maxRequestsPerMinute) {
long first = requests.poll(); // gets the first request
// The difference between the current request and the first request of the queue
final int differenceInSeconds = (int) (current - first) / 1000;
// if the difference is less than the maximum allowed
if (differenceInSeconds <= 60) {
// seconds to sleep.
final int secondsToSleep = 60 - differenceInSeconds;
sleep(secondsToSleep);
}
}
}
void sleep( int seconds){
try {
Thread.sleep(seconds * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
所以,有一个我可以使用的数据结构?我应该考虑什么?
提前谢谢。
2条答案
按热度按时间qmb5sa221#
我实现了@gthanop所建议的一些不同的东西。
我发现,极限可能会改变。所以,我可能需要增加或缩小阻止列表。另一个原因是,要使我们当前的代码适应这种情况并不容易。还有一个,我们可能会使用多个示例,所以我们需要一个分布式锁。
所以,我实现了一些更容易的东西,但是没有@ghtanop的答案那么有效。
这是我的代码(改编,因为我不能显示公司代码):
hgb9j2n62#
如果正确理解你的问题,你可以使用
BlockingQueue
用一个ScheduledExecutorService
如下所示。BlockingQueue
我们有办法put
它只会在有可用空间的情况下将给定元素添加到队列中,否则方法调用将等待(直到有可用空间为止)。他们也有方法take
它只会在有任何元素的情况下从队列中删除元素,否则方法调用将等待(直到至少有一个元素可以执行)。具体来说,你可以使用
LinkedBlockingQueue
或者一个ArrayBlockingQueue
在任何给定的时间都可以用固定大小的元素来表示。此固定大小意味着您可以提交put
你可以提出很多要求,但你只能take
请求并每秒处理一次(例如每分钟发出60个请求)。示例化
LinkedBlockingQueue
对于固定大小,只需使用相应的构造函数(它接受大小作为参数)。LinkedBlockingQueue
威尔take
根据其文件以先进先出顺序排列的元素。示例化
ArrayBlockingQueue
对于固定大小,使用构造函数,它接受大小,但也接受boolean
已命名的标志fair
. 如果这个标志是true
然后队列将take
元素也按fifo顺序排列。那你就可以喝一杯了
ScheduledExecutorService
(而不是在循环中等待)您可以在其中提交一个Runnable
哪个会take
从队列中,与外部api进行通信,然后等待通信之间所需的延迟。下面是上面的一个简单示例:
注意我用了
scheduleWithFixedDelay
而不是scheduleAtFixedRate
. 您可以在他们的文档中看到,第一个将等待延迟之间的通话结束Runnable
开始下一个,而第二个不会等待,只是重新提交Runnable
每period
时间单位。但是我们不知道与外部api通信需要多长时间,所以如果我们scheduleAtFixedRate
用一个period
每分钟一次,但请求需要一分钟以上才能完成?。。。然后在第一个请求尚未完成时提交一个新请求。所以我用scheduleWithFixedDelay
而不是scheduleAtFixedRate
. 但还有更多:我使用了单线程调度执行器服务。这是否意味着如果第一个呼叫没有完成,那么第二个呼叫就不能启动?。。。如果你看一下Executors#newSingleThreadScheduledExecutor()
,因为单线程核心池大小而可能发生第二次调用,并不意味着池的大小是固定的。我用的另一个理由
scheduleWithFixedDelay
是因为请求量不足。例如,队列是空的怎么办?然后调度也应该等待,而不是提交Runnable
再一次。另一方面,如果我们使用
scheduleWithFixedDelay
,比如说延迟1/60f
在调度之间间隔几秒钟,并且在一分钟内提交了60多个请求,那么这肯定会使我们对外部api的吞吐量下降,因为scheduleWithFixedDelay
我们可以保证最多有60个对外部api的请求。它可以比这个小,但我们不想这样。我们希望每次都能达到极限。如果您不关心这个问题,那么就可以使用上面的实现了。但是假设你每次都尽可能地接近极限,在这种情况下,据我所知,你可以用一个定制的调度程序来实现这一点,这将是比第一个更干净的解决方案,但更精确的时间。
总之,在上面的实现中,您需要确保与外部api的通信尽可能快地服务于请求。
最后,我要提醒你,如果
BlockingQueue
我建议的实现不是put
按先进先出的顺序。我的意思是,如果两个请求几乎同时到达,而队列已经满了呢?他们都会等,但是第一个到达的人会等着得到吗put
第一个是艾德,第二个是艾德put
埃德先来?我不知道。如果您不关心某些请求在外部api中被无序地发出,那么不要担心,直到目前为止都使用代码。但是,如果您确实关心,并且能够在每个请求中输入一个序列号,那么您可以使用PriorityQueue
之后BlockingQueue
,甚至尝试PriorityBlockingQueue
(不幸的是,它是无限的)。这会使事情变得更加复杂,所以我没有用PriorityQueue
. 至少我已经尽力了,我希望我能说出一些好主意。我并不是说这篇文章是一个完整的解决所有问题的办法,但它是一些考虑开始。