java 非阻塞速率限制ThreadPoolExecutor

x759pob2  于 2023-05-27  发布在  Java
关注(0)|答案(2)|浏览(131)

我正在使用多个连接同时访问HTTP服务器。我想对客户端进行节流,以响应服务器指示请求进入得太快。我不想改变我正在使用的HTTP库,而是想扩展它。
为此,如何实现具有以下约束的ThreadPoolExecutor

  • 遗嘱执行人有一个可选的利率限制。
  • 当速率限制被禁用时,它会尽可能快地执行任务(ThreadPoolExecutor的正常行为)。
  • 启用速率限制后,每秒最多可执行N个任务。
  • 速率限制适用于所有执行器线程,而不是每个线程。
  • 不允许爆发。也就是说,如果限制是每秒10个请求,我希望每100ms开始一个请求。我不希望所有的线程同时启动,然后在第二个剩余的时间里保持空闲。
  • 速率限制是动态的。如果请求失败,则速率降低。如果请求成功,则速率增加。
  • 当没有任务准备好执行时,线程被认为是空闲的(考虑到速率限制)。这意味着,我希望ThreadPoolExecutor标记这些线程是空闲的,并在它认为合适的时候将它们降速,而不是阻塞线程,直到达到速率限制。另一方面,一旦到了执行下一个任务的时间,线程应该再次加速。

What I've Looked Into

  • ScheduledThreadPoolExecutorDelayedQueue假设执行延迟在事件排队时间是已知的,而在我的例子中,在任务排队的时间和它被执行的时间之间速率可能会改变。
  • RateLimiter可能是答案的一部分,但它本身还不够。
m528fe3b

m528fe3b1#

回答我自己的问题:

  • 不可能有一个完全不阻塞的解决方案。即使是ScheduledThreadPoolExecutor也至少保留一个线程,等待队列返回新任务。
  • ThreadPoolExecutor位于BlockingQueue之上。当没有剩余任务时,它将阻塞BlockingQueue.take()
  • 该解决方案有3个移动部件:

1.利率限制器。
1.一个BlockingQueue,隐藏元素,直到速率限制器允许使用它们。
1.位于BlockingQueue顶部的ThreadPoolExecutor

限速器

我提供了自己的基于Token Bucket algorithm算法的速率限制器,以克服RateLimiterlimitations。源代码为here

BlockingQueue

我实现了一个BlockingDeque(它扩展了BlockingQueue),因为将来我想尝试将失败的任务推回到队列的前面。
RateLimitedBlockingDeque.java

import java.time.Duration;
import java.util.Collection;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import static org.bitbucket.cowwoc.requirements.core.Requirements.requireThat;

/**
 * A blocking deque of elements, in which an element can only be taken when the deque-wide delay has expired.
 * <p>
 * The optional capacity bound constructor argument serves as a way to prevent excessive expansion. The capacity, if
 * unspecified, is equal to {@link Integer#MAX_VALUE}.
 * <p>
 * Even though methods that take elements, such as {@code take} or {@code poll}, respect the deque-wide delay the
 * remaining methods treat them as normal elements. For example, the {@code size} method returns the count of both
 * expired and unexpired elements.
 * <p>
 * This class and its iterator implement all of the <em>optional</em> methods of the {@link Collection} and {@link
 * Iterator} interfaces.
 *
 * @param <E> the type of elements in the deque
 * @author Gili Tzabari
 */
public final class RateLimitedBlockingDeque<E> implements BlockingDeque<E>
{
    private final int capacity;
    private final LinkedBlockingDeque<E> delegate;
    private final Bucket rateLimit = new Bucket();

    /**
     * Creates a {@code RateLimitedBlockingDeque} with a capacity of {@link Integer#MAX_VALUE}.
     */
    public RateLimitedBlockingDeque()
    {
        this.capacity = Integer.MAX_VALUE;
        this.delegate = new LinkedBlockingDeque<>();
    }

    /**
     * Creates a {@code RateLimitedBlockingDeque} with the given (fixed) capacity.
     *
     * @param capacity the capacity of this deque
     * @throws IllegalArgumentException if {@code capacity} is less than 1
     */
    public RateLimitedBlockingDeque(int capacity)
    {
        this.capacity = capacity;
        this.delegate = new LinkedBlockingDeque<>(capacity);
    }

    /**
     * @return the capacity of the deque
     */
    public int getCapacity()
    {
        return capacity;
    }

    /**
     * Indicates the rate at which elements may be taken from the queue.
     *
     * @param elements the number of elements that may be taken per {@code period}
     * @param period   indicates how often elements may be taken
     * @throws NullPointerException     if {@code period} is null
     * @throws IllegalArgumentException if the requested rate is greater than element per nanosecond
     */
    public void setRate(long elements, Duration period)
    {
        synchronized (rateLimit)
        {
            Limit newLimit = new Limit(elements, period, 0, Long.MAX_VALUE);
            if (rateLimit.getLimits().isEmpty())
                rateLimit.addLimit(newLimit);
            else
            {
                Limit oldLimit = rateLimit.getLimits().iterator().next();
                rateLimit.replaceLimit(oldLimit, newLimit);
            }
        }
    }

    /**
     * Allows consumption of elements without limit.
     */
    public void removeRate()
    {
        synchronized (rateLimit)
        {
            rateLimit.removeAllLimits();
        }
    }

    @Override
    public void addFirst(E e)
    {
        delegate.addFirst(e);
    }

    @Override
    public void addLast(E e)
    {
        delegate.addLast(e);
    }

    @Override
    public boolean offerFirst(E e)
    {
        return delegate.offerFirst(e);
    }

    @Override
    public boolean offerLast(E e)
    {
        return delegate.offerLast(e);
    }

    @Override
    public void putFirst(E e) throws InterruptedException
    {
        delegate.putFirst(e);
    }

    @Override
    public void putLast(E e) throws InterruptedException
    {
        delegate.putLast(e);
    }

    @Override
    public boolean offerFirst(E e, long timeout, TimeUnit unit) throws InterruptedException
    {
        return delegate.offerFirst(e, timeout, unit);
    }

    @Override
    public boolean offerLast(E e, long timeout, TimeUnit unit) throws InterruptedException
    {
        return delegate.offerLast(e, timeout, unit);
    }

    @Override
    public E removeFirst()
    {
        if (rateLimit.tryConsume())
            return delegate.removeFirst();
        throw new NoSuchElementException();
    }

    @Override
    public E removeLast()
    {
        if (rateLimit.tryConsume())
            return delegate.removeLast();
        throw new NoSuchElementException();
    }

    @Override
    public E pollFirst()
    {
        if (rateLimit.tryConsume())
            return delegate.pollFirst();
        return null;
    }

    @Override
    public E pollLast()
    {
        if (rateLimit.tryConsume())
            return delegate.pollLast();
        return null;
    }

    @Override
    public E takeFirst() throws InterruptedException
    {
        rateLimit.consume();
        return delegate.takeFirst();
    }

    @Override
    public E takeLast() throws InterruptedException
    {
        rateLimit.consume();
        return delegate.takeLast();
    }

    @Override
    public E pollFirst(long timeout, TimeUnit unit) throws InterruptedException
    {
        if (rateLimit.consume(1, timeout, unit))
            return delegate.pollFirst(timeout, unit);
        return null;
    }

    @Override
    public E pollLast(long timeout, TimeUnit unit) throws InterruptedException
    {
        if (rateLimit.consume(1, timeout, unit))
            return delegate.pollLast(timeout, unit);
        return null;
    }

    @Override
    public E getFirst()
    {
        return delegate.getFirst();
    }

    @Override
    public E getLast()
    {
        return delegate.getLast();
    }

    @Override
    public E peekFirst()
    {
        return delegate.peekFirst();
    }

    @Override
    public E peekLast()
    {
        return delegate.peekLast();
    }

    @Override
    public boolean removeFirstOccurrence(Object o)
    {
        return delegate.removeFirstOccurrence(o);
    }

    @Override
    public boolean removeLastOccurrence(Object o)
    {
        return delegate.removeLastOccurrence(o);
    }

    @Override
    public boolean add(E e)
    {
        return delegate.add(e);
    }

    @Override
    public boolean offer(E e)
    {
        return delegate.offer(e);
    }

    @Override
    public void put(E e) throws InterruptedException
    {
        putLast(e);
    }

    @Override
    public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException
    {
        return delegate.offer(e, timeout, unit);
    }

    @Override
    public E remove()
    {
        return removeFirst();
    }

    @Override
    public E poll()
    {
        return pollFirst();
    }

    @Override
    public E take() throws InterruptedException
    {
        return takeFirst();
    }

    @Override
    public E poll(long timeout, TimeUnit unit) throws InterruptedException
    {
        return pollFirst(timeout, unit);
    }

    @Override
    public E element()
    {
        return getFirst();
    }

    @Override
    public E peek()
    {
        return peekFirst();
    }

    @Override
    public int remainingCapacity()
    {
        return delegate.remainingCapacity();
    }

    @Override
    public int drainTo(Collection<? super E> c)
    {
        int result = 0;
        while (true)
        {
            E next = pollFirst();
            if (next == null)
                break;
            c.add(next);
        }
        return result;
    }

    @Override
    public int drainTo(Collection<? super E> c, int maxElements)
    {
        int result = 0;
        do
        {
            E next = pollFirst();
            if (next == null)
                break;
            c.add(next);
        }
        while (result < maxElements);
        return result;
    }

    @Override
    public void push(E e)
    {
        addFirst(e);
    }

    @Override
    public E pop()
    {
        return removeFirst();
    }

    @Override
    public boolean remove(Object o)
    {
        return removeFirstOccurrence(o);
    }

    @Override
    public int size()
    {
        return delegate.size();
    }

    @Override
    public boolean contains(Object o)
    {
        return delegate.contains(o);
    }

    @Override
    public Object[] toArray()
    {
        return delegate.toArray();
    }

    @Override
    public <T> T[] toArray(T[] a)
    {
        return delegate.toArray(a);
    }

    @Override
    public String toString()
    {
        return delegate.toString();
    }

    @Override
    public void clear()
    {
        delegate.clear();
    }

    @Override
    public Iterator<E> iterator()
    {
        return wrap(delegate.iterator());
    }

    /**
     * @param delegateIterator the iterator to delegate to
     * @return an iterator that respects the rate-limit
     */
    private Iterator<E> wrap(Iterator<E> delegateIterator)
    {
        return new Iterator<E>()
        {
            private E previousElement = null;

            @Override
            public boolean hasNext()
            {
                return delegateIterator.hasNext();
            }

            @Override
            public E next()
            {
                return delegateIterator.next();
            }

            @Override
            public void remove()
            {
                if (previousElement == null)
                    throw new IllegalStateException("next() not invoked, or remove() already invoked");
                try
                {
                    rateLimit.consume();
                }
                catch (InterruptedException e)
                {
                    throw new IllegalStateException(e);
                }
                delegateIterator.remove();
                previousElement = null;
            }
        };
    }

    @Override
    public Iterator<E> descendingIterator()
    {
        return wrap(delegate.descendingIterator());
    }

    @Override
    public boolean addAll(Collection<? extends E> c)
    {
        requireThat("c", c).isNotNull().isNotEqualTo("this", this);
        boolean modified = false;
        for (E e: c)
            if (add(e))
                modified = true;
        return modified;
    }

    @Override
    public boolean isEmpty()
    {
        return delegate.isEmpty();
    }

    @Override
    public boolean containsAll(Collection<?> c)
    {
        return delegate.containsAll(c);
    }

    @Override
    public boolean removeAll(Collection<?> c)
    {
        Iterator<E> i = iterator();
        boolean modified = true;
        while (i.hasNext())
        {
            E element = i.next();
            if (c.contains(element))
            {
                i.remove();
                modified = true;
            }
        }
        return modified;
    }

    @Override
    public boolean retainAll(Collection<?> c)
    {
        Iterator<E> i = iterator();
        boolean modified = true;
        while (i.hasNext())
        {
            E element = i.next();
            if (!c.contains(element))
            {
                i.remove();
                modified = true;
            }
        }
        return modified;
    }

    @Override
    public int hashCode()
    {
        return delegate.hashCode();
    }

    @Override
    public boolean equals(Object obj)
    {
        return delegate.equals(obj);
    }
}
q35jwt9p

q35jwt9p2#

用途:

public static void main(String[] args) throws InterruptedException {
    RateFriendlyThreadPoolExecutor executor = new RateFriendlyThreadPoolExecutor(3, 5, 1, SECONDS, new LinkedBlockingDeque<>(100));
    executor.setRate(10);
    executor.setMinRate(1);
    executor.setMaxRate(100);

    for (int i = 0; i < 1000; i++) {
        int lap = i;
        executor.execute(() -> System.out.printf("%s (%s) - %s - %s%n", lap, executor.rate, LocalDateTime.now(), Thread.currentThread().getName()));
    }

    executor.shutdown();
    executor.awaitTermination(60, SECONDS);
}

输出:

0 (10) - 2023-05-27T00:51:47.276183 - pool-1-thread-1
1 (11) - 2023-05-27T00:51:47.380860100 - pool-1-thread-2
2 (11) - 2023-05-27T00:51:47.491031600 - pool-1-thread-3
105 (11) - 2023-05-27T00:51:47.693108100 - main
103 (12) - 2023-05-27T00:51:47.804222900 - pool-1-thread-4
104 (12) - 2023-05-27T00:51:47.913482800 - pool-1-thread-5
3 (12) - 2023-05-27T00:51:48.006381300 - pool-1-thread-1
4 (12) - 2023-05-27T00:51:48.099608400 - pool-1-thread-2
5 (12) - 2023-05-27T00:51:48.192840500 - pool-1-thread-3
109 (12) - 2023-05-27T00:51:48.379760400 - main

实施:

import static java.lang.System.nanoTime;
import static java.util.concurrent.locks.LockSupport.parkNanos;

public static class RateFriendlyThreadPoolExecutor extends ThreadPoolExecutor {

    private AtomicInteger rate = new AtomicInteger();
    private AtomicInteger minRate = new AtomicInteger();
    private AtomicInteger maxRate = new AtomicInteger();
    private AtomicLong leapTime = new AtomicLong();
    private ReentrantLock rateLock = new ReentrantLock();

    public RateFriendlyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, (r, e) -> overflow(r, (RateFriendlyThreadPoolExecutor) e));
    }

    @Override
    public void execute(Runnable command) {
        super.execute(() -> run(command));
    }

    private void run(Runnable runnable) {
        int rateSnapshot = rate.get();
        limitRate(rateSnapshot, leapTime, rateLock);
        try {

            runnable.run();

            rate.compareAndSet(rateSnapshot, Math.min(rateSnapshot + 1, maxRate.get()));
        } catch (Exception e) {
            if (!isThroated(e))
                throw e;
            System.out.println("rejected at rate " + rateSnapshot);
            rate.set(minRate.get());
            execute(runnable);
        }
    }

    public static void limitRate(int rate, AtomicLong leapTime, ReentrantLock rateLock) {
        if (rate == 0)
            return;
        long targetLeapTime = 1000_000_000 / rate;
        rateLock.lock();
        try {
            long parkTime = targetLeapTime - (nanoTime() - leapTime.get());
            if (parkTime > 0)
                parkNanos(parkTime);
            leapTime.set(nanoTime());
        } finally {
            rateLock.unlock();
        }
    }

    private static void overflow(Runnable r, RateFriendlyThreadPoolExecutor e) {
        if (!e.isShutdown())
            e.run(r);
    }
    
    private boolean isThroated(Exception e) {
        return e.getMessage().contains("Reduce your rate");
    }

    public void setRate(int rate) {
        this.rate.set(rate);
        minRate.compareAndSet(0, rate);
        maxRate.compareAndSet(0, rate);
    }

    public void setMinRate(int minRate) {
        this.minRate.set(minRate);
    }

    public void setMaxRate(int maxRate) {
        this.maxRate.set(maxRate);
    }
}

相关问题