java—用于管理api每分钟最大请求数的数据结构

twh00eeo  于 2021-06-04  发布在  Kafka
关注(0)|答案(2)|浏览(438)

我需要将数据发送到外部api,但此api对每个端点的请求数有限制(即:每分钟60个请求)。
数据来自Kafka,然后每条消息都发送到redis(因为我可以发送一个包含200个项目的请求)。所以,我使用一个简单的缓存来帮助我,我可以保证如果我的服务器宕机,我不会丢失任何消息。
问题是,有时Kafka开始发送许多消息,然后redis开始增长(要发送到api的消息超过100万条),我们不能在消息进来时发出太快的请求。然后,我们有一个很大的延迟。
我的第一个代码很简单: ExecutorService executor = Executors.newFixedThreadPool(1); 当消息很少且延迟最小时,这种方法非常有效。
所以,我做的第一件事就是把遗嘱执行人改成: ExecutorService executor = Executors.newCachedThreadPool(); 因此,我可以要求新的线程,因为我需要使对外部api的请求更快,但是,我有一个问题,即每分钟请求的限制。
有一些端点,我可以每分钟发出300个请求,其他的是500个,其他的是30个,以此类推。
我做的代码不是很好,这是为了我工作的公司,所以,我真的需要把它做得更好。
所以,每次我要请求外部api时,我都会调用makerequest方法,这个方法是同步的,我知道我可以使用一个同步的列表,但是我认为在这种情况下,一个同步的方法更好。

// This is an inner class
private static class IntegrationType {

    final Queue<Long> requests; // This queue is used to store the timestamp of the requests
    final int maxRequestsPerMinute; // How many requests I can make per minute

    public IntegrationType(final int maxRequestsPerMinute) {
        this.maxRequestsPerMinute = maxRequestsPerMinute;
        this.requests = new LinkedList<>();
    }

    synchronized void makeRequest() {
        final long current = System.currentTimeMillis();
        requests.add(current);
        if (requests.size() >= maxRequestsPerMinute) {
            long first = requests.poll(); // gets the first request

            // The difference between the current request and the first request of the queue
            final int differenceInSeconds = (int) (current - first) / 1000;

            // if the difference is less than the maximum allowed
            if (differenceInSeconds <= 60) {
                // seconds to sleep.
                final int secondsToSleep = 60 - differenceInSeconds;
                sleep(secondsToSleep);
            }
        }
    }

     void sleep( int seconds){
        try {
            Thread.sleep(seconds * 1000);
         } catch (InterruptedException e) {
            e.printStackTrace();
         }
     }
}

所以,有一个我可以使用的数据结构?我应该考虑什么?
提前谢谢。

qmb5sa22

qmb5sa221#

我实现了@gthanop所建议的一些不同的东西。
我发现,极限可能会改变。所以,我可能需要增加或缩小阻止列表。另一个原因是,要使我们当前的代码适应这种情况并不容易。还有一个,我们可能会使用多个示例,所以我们需要一个分布式锁。
所以,我实现了一些更容易的东西,但是没有@ghtanop的答案那么有效。
这是我的代码(改编,因为我不能显示公司代码):

import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ScheduledExecutorService;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;

public class Teste {

    private static enum ExternalApi {    
        A, B, C;
    }

    private static class RequestManager {

        private long firstRequest; // First request in one minute

        // how many request have we made
        private int requestsCount = 0;

        // A timer thread, it will execute at every minute, it will refresh the request count and the first request time
        private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);

        RequestManager() {
            final long initialDelay = 0L;
            final long fixedRate = 60;

            executor.scheduleAtFixedRate(() -> {
                System.out.println("Clearing the current count!");
                requestsCount = 0;
                firstRequest = System.currentTimeMillis();
            }, initialDelay, fixedRate, TimeUnit.SECONDS);
        }

        void incrementRequest() {
            requestsCount++;
        }

        long getFirstRequest() {
            return firstRequest;
        }

        boolean requestsExceeded(final int requestLimit) {
            return requestsCount >= requestLimit;
        }

    }

    public static class RequestHelper {

        private static final byte SECONDS_IN_MINUTE = 60;
        private static final short MILLISECONDS_IN_SECOND = 1000;
        private static final byte ZERO_SECONDS = 0;

        // Table to support the time, and count of the requests
        private final Map<Integer, RequestManager> requests;

        // Table that contains the limits of each type of request
        private final Map<Integer, Integer> requestLimits;

        /**
         * We need an array of semaphores, because, we might lock the requests for ONE, but not for TWO
         */
        private final Semaphore[] semaphores;

        private RequestHelper(){

            // one semaphore for type
            semaphores = new Semaphore[ExternalApi.values().length];
            requests = new ConcurrentHashMap<>();
            requestLimits = new HashMap<>();

            for (final ExternalApi type : ExternalApi.values()) {

                // Binary semaphore, must be fair, because we are updating things.
                semaphores[type.ordinal()] = new Semaphore(1, true);
            }
        }

        /**
         * When my token expire, I must update this, because the limits might change.
         * @param limits the new api limits
         */
        protected void updateLimits(final Map<ExternalApi, Integer> limits) {
            limits.forEach((key, value) -> requestLimits.put(key.ordinal(), value));
        }

        /**
         * Increments the counter for the type of the request,
         * Using the mutual exclusion lock, we can handle and block other threads that are trying to
         * do a request to the api.
         * If the incoming requests are going to exceed the maximum, we will make the thread sleep for N seconds ( 60 - time since first request)
         * since we are using a Binary Semaphore, it will block incoming requests until the thread that is sleeping, wakeup and release the semaphore lock.
         *
         * @param type of the integration, Supp, List, PET etc ...
         */
        protected final void addRequest(final ExternalApi type) {

            // the index of this request
            final int requestIndex = type.ordinal();

            // we get the permit for the semaphore of the type
            final Semaphore semaphore = semaphores[requestIndex];

            // Try to acquire a permit, if no permit is available, it will block until one is available.
            semaphore.acquireUninterruptibly();

            ///gets the requestManager for the type
            final RequestManager requestManager = getRequest(requestIndex);

            // increments the number of requests
            requestManager.incrementRequest();

            if (requestManager.requestsExceeded(requestLimits.get(type.ordinal()))) {

                // the difference in seconds between a minute - the time that we needed to reach the maximum of requests
                final int secondsToSleep = SECONDS_IN_MINUTE - (int) (System.currentTimeMillis() - requestManager.getFirstRequest()) / MILLISECONDS_IN_SECOND;

                // We reached the maximum in less than a minute
                if (secondsToSleep > ZERO_SECONDS) {
                    System.out.printf("We reached the maximum of: %d per minute by: %s. We must wait for: %d before make a new request!\n", requestLimits.get(type.ordinal()), type.name(), secondsToSleep);
                    sleep(secondsToSleep * MILLISECONDS_IN_SECOND);
                }
            }
            // releases the semaphore
            semaphore.release();
        }

        private final void sleep(final long time) {
            try {
                Thread.sleep(time);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        /**
         * Gets the first Request Manager, if it is the first request, it will create the
         * RequestManager object
         * @param index
         * @return a RequestManager instance
         */
        private RequestManager getRequest(final int index) {
            RequestManager request = requests.get(index);
            if(request == null) {
                request = new RequestManager();
                requests.put(index, request);
            }
            return request;
        }
    }

    public static void main(String[] args) {

        final RequestHelper requestHelper = new RequestHelper();

        final Map<ExternalApi, Integer> apiLimits = Map.of(ExternalApi.A, 30, ExternalApi.B, 60, ExternalApi.C, 90);

        // update the limits
        requestHelper.updateLimits(apiLimits);

        final ScheduledExecutorService executor = Executors.newScheduledThreadPool(3);
        executor.scheduleWithFixedDelay(() -> {
            System.out.println("A new request is going to happen");
            requestHelper.addRequest(ExternalApi.A);
            sleep(65);
        }, 0, 100, TimeUnit.MILLISECONDS);

        executor.scheduleWithFixedDelay(() -> {
            System.out.println("B new request is going to happen");
            requestHelper.addRequest(ExternalApi.B);
            sleep(50);
        }, 0, 200, TimeUnit.MILLISECONDS);

        executor.scheduleWithFixedDelay(() -> {
            System.out.println("C new request is going to happen");
            requestHelper.addRequest(ExternalApi.C);
            sleep(30);
        }, 0, 300, TimeUnit.MILLISECONDS);

    }

    private static final void sleep(final long time) {
        try {
            Thread.sleep(time);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    } 
}
hgb9j2n6

hgb9j2n62#

如果正确理解你的问题,你可以使用 BlockingQueue 用一个 ScheduledExecutorService 如下所示。 BlockingQueue 我们有办法 put 它只会在有可用空间的情况下将给定元素添加到队列中,否则方法调用将等待(直到有可用空间为止)。他们也有方法 take 它只会在有任何元素的情况下从队列中删除元素,否则方法调用将等待(直到至少有一个元素可以执行)。
具体来说,你可以使用 LinkedBlockingQueue 或者一个 ArrayBlockingQueue 在任何给定的时间都可以用固定大小的元素来表示。此固定大小意味着您可以提交 put 你可以提出很多要求,但你只能 take 请求并每秒处理一次(例如每分钟发出60个请求)。
示例化 LinkedBlockingQueue 对于固定大小,只需使用相应的构造函数(它接受大小作为参数)。 LinkedBlockingQueue 威尔 take 根据其文件以先进先出顺序排列的元素。
示例化 ArrayBlockingQueue 对于固定大小,使用构造函数,它接受大小,但也接受 boolean 已命名的标志 fair . 如果这个标志是 true 然后队列将 take 元素也按fifo顺序排列。
那你就可以喝一杯了 ScheduledExecutorService (而不是在循环中等待)您可以在其中提交一个 Runnable 哪个会 take 从队列中,与外部api进行通信,然后等待通信之间所需的延迟。
下面是上面的一个简单示例:

import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class Main {

    public static class RequestSubmitter implements Runnable {
        private final BlockingQueue<Request> q;

        public RequestSubmitter(final BlockingQueue<Request> q) {
            this.q = Objects.requireNonNull(q);
        }

        @Override
        public void run() {
            try {
                q.put(new Request()); //Will block until available capacity.
            }
            catch (final InterruptedException ix) {
                System.err.println("Interrupted!"); //Not expected to happen under normal use.
            }
        }
    }

    public static class Request {
        public void make() {
            try {
                //Let's simulate the communication with the external API:
                TimeUnit.MILLISECONDS.sleep((long) (Math.random() * 100));
            }
            catch (final InterruptedException ix) {
                //Let's say here we failed to communicate with the external API...
            }
        }
    }

    public static class RequestImplementor implements Runnable {
        private final BlockingQueue<Request> q;

        public RequestImplementor(final BlockingQueue<Request> q) {
            this.q = Objects.requireNonNull(q);
        }

        @Override
        public void run() {
            try {
                q.take().make(); //Will block until there is at least one element to take.
                System.out.println("Request made.");
            }
            catch (final InterruptedException ix) {
                //Here the 'taking' from the 'q' is interrupted.
            }
        }
    }

    public static void main(final String[] args) throws InterruptedException {

        /*The following initialization parameters specify that we
        can communicate with the external API 60 times per 1 minute.*/
        final int maxRequestsPerTime = 60;
        final TimeUnit timeUnit = TimeUnit.MINUTES;
        final long timeAmount = 1;

        final BlockingQueue<Request> q = new ArrayBlockingQueue<>(maxRequestsPerTime, true);
        //final BlockingQueue<Request> q = new LinkedBlockingQueue<>(maxRequestsPerTime);

        //Submit some RequestSubmitters to the pool...
        final ExecutorService pool = Executors.newFixedThreadPool(100);
        for (int i = 0; i < 50_000; ++i)
            pool.submit(new RequestSubmitter(q));

        System.out.println("Serving...");

        //Find out the period between communications with the external API:
        final long delayMicroseconds = TimeUnit.MICROSECONDS.convert(timeAmount, timeUnit) / maxRequestsPerTime;
        //We could do the same with NANOSECONDS for more accuracy, but that would be overkill I think.

        //The most important line probably:
        Executors.newSingleThreadScheduledExecutor().scheduleWithFixedDelay(new RequestImplementor(q), 0L, delayMicroseconds, TimeUnit.MICROSECONDS);
    }
}

注意我用了 scheduleWithFixedDelay 而不是 scheduleAtFixedRate . 您可以在他们的文档中看到,第一个将等待延迟之间的通话结束 Runnable 开始下一个,而第二个不会等待,只是重新提交 Runnableperiod 时间单位。但是我们不知道与外部api通信需要多长时间,所以如果我们 scheduleAtFixedRate 用一个 period 每分钟一次,但请求需要一分钟以上才能完成?。。。然后在第一个请求尚未完成时提交一个新请求。所以我用 scheduleWithFixedDelay 而不是 scheduleAtFixedRate . 但还有更多:我使用了单线程调度执行器服务。这是否意味着如果第一个呼叫没有完成,那么第二个呼叫就不能启动?。。。如果你看一下 Executors#newSingleThreadScheduledExecutor() ,因为单线程核心池大小而可能发生第二次调用,并不意味着池的大小是固定的。
我用的另一个理由 scheduleWithFixedDelay 是因为请求量不足。例如,队列是空的怎么办?然后调度也应该等待,而不是提交 Runnable 再一次。
另一方面,如果我们使用 scheduleWithFixedDelay ,比如说延迟 1/60f 在调度之间间隔几秒钟,并且在一分钟内提交了60多个请求,那么这肯定会使我们对外部api的吞吐量下降,因为 scheduleWithFixedDelay 我们可以保证最多有60个对外部api的请求。它可以比这个小,但我们不想这样。我们希望每次都能达到极限。如果您不关心这个问题,那么就可以使用上面的实现了。
但是假设你每次都尽可能地接近极限,在这种情况下,据我所知,你可以用一个定制的调度程序来实现这一点,这将是比第一个更干净的解决方案,但更精确的时间。
总之,在上面的实现中,您需要确保与外部api的通信尽可能快地服务于请求。
最后,我要提醒你,如果 BlockingQueue 我建议的实现不是 put 按先进先出的顺序。我的意思是,如果两个请求几乎同时到达,而队列已经满了呢?他们都会等,但是第一个到达的人会等着得到吗 put 第一个是艾德,第二个是艾德 put 埃德先来?我不知道。如果您不关心某些请求在外部api中被无序地发出,那么不要担心,直到目前为止都使用代码。但是,如果您确实关心,并且能够在每个请求中输入一个序列号,那么您可以使用 PriorityQueue 之后 BlockingQueue ,甚至尝试 PriorityBlockingQueue (不幸的是,它是无限的)。这会使事情变得更加复杂,所以我没有用 PriorityQueue . 至少我已经尽力了,我希望我能说出一些好主意。我并不是说这篇文章是一个完整的解决所有问题的办法,但它是一些考虑开始。

相关问题