在blockingqueue为空时杀死消费者

anhgbhbe  于 2021-07-03  发布在  Java
关注(0)|答案(2)|浏览(395)

我在读blockingqueue,ExecuteService和生产者-消费者范式。我希望有一个不断变化的生产者数量,和不断变化的消费者数量。每个生产者将附加到队列中,消费者将使用消息并对其进行处理。我的问题是-生产者如何知道消费者已经完成了,而不再有消息进入队列?我想在我的主线程中添加一个计数器。当一个producer启动时,我将增加计数器,当每个producer结束时,他们将减少int。我的消费者将能够知道计数器,当计数器达到0,并且队列中没有更多的元素时,他们可以死亡。
关于同步工作的另一个一般性问题是:主线程应该读取队列的内容,并为每条消息添加执行器,还是让线程知道这个逻辑并自行决定何时终止是最佳做法?
当系统启动时,我会收到一个数字,决定有多少生产商将启动。每个生产者将在队列中生成一组随机数字。消费者将把这些数字打印到日志中。我面临的问题是,一旦我知道最后一个生产商推出了最后一个数字,我仍然不明白如何让消费者知道,不会再有更多的数字进来,他们应该关闭。
消费者怎么知道生产者什么时候完蛋了?

dtcbnfnu

dtcbnfnu1#

当生产者完成,最后一个可以中断所有消费者和(可能)生产者。 InterruptedException 每当阻塞调用(无论是 put() 或者 take() )被另一个线程通过 thread.interrupt() ,在哪里 thread 是调用该方法的线程。当最后一个生产者完成时,它可以中断所有其他线程,这将导致所有阻塞方法抛出 InterruptedException ,允许您终止相应的线程。

final BlockingQueue<T> queue = ...;
final List<Thread> threads = new ArrayList<>();

threads.add(new Producer1());
threads.add(new Producer2());
threads.add(new Consumer1());
threads.add(new Consumer2());
threads.forEach(Thread::start);

// Done by the last producer, or any other thread
threads.forEach(Thread::interrupt);

class Producer extends Thread {
    @Override
    public void run() {
        for (int i = 0; i < X; i++) {
            T element;
            // Produce element
            try {
                queue.put(element);
            } catch (InterruptedException e) {
                break; // Optional, only if other producers may still be running and
                       // you want to stop them, or interruption is performed by
                       // a completely different thread
            }
        }
    }
}

class Consumer extends Thread {
    @Override
    public void run() {
        while (true) {
            T element;
            try {
                element = queue.take();
            } catch (InterruptedException e) {
                break;
            }
            // Consume element
        }
    }
}
yqhsw0fo

yqhsw0fo2#

一个优雅的解决方案是使用毒丸模式。下面是一个如何工作的例子。在这种情况下,你只需要知道生产商的数量。
编辑:我更新了代码,以便在最后一个消费者完成工作时清除队列。

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;

public class PoisonPillsTests {

    interface Message {

    }

    interface PoisonPill extends Message {
        PoisonPill INSTANCE = new PoisonPill() {
        };
    }

    static class TextMessage implements Message {

        private final String text;

        public TextMessage(String text) {
            this.text = text;
        }

        public String getText() {
            return text;
        }

        @Override
        public String toString() {
            return text;
        }
    }

    static class Producer implements Runnable {

        private final String producerName;
        private final AtomicInteger producersCount;
        private final BlockingQueue<Message> messageBlockingQueue;

        public Producer(String producerName, BlockingQueue<Message> messageBlockingQueue, AtomicInteger producersCount) {
            this.producerName = producerName;
            this.messageBlockingQueue = messageBlockingQueue;
            this.producersCount = producersCount;
        }

        @Override
        public void run() {
            try {
                for (int i = 0; i < 100; i++) {
                    messageBlockingQueue.put(new TextMessage("Producer " + producerName + " message " + i));
                }
                if (producersCount.decrementAndGet() <= 0) {
                    //we need this producersCount so that the producers to produce a single poison pill
                    messageBlockingQueue.put(PoisonPill.INSTANCE);
                }
            } catch (InterruptedException e) {
                throw new RuntimeException("Producer interrupted", e);
            }
        }
    }

    static class Consumer implements Runnable {

        private final AtomicInteger consumersCount;
        private final AtomicInteger consumedMessages;
        private final BlockingQueue<Message> messageBlockingQueue;

        public Consumer(BlockingQueue<Message> messageBlockingQueue, AtomicInteger consumersCount, AtomicInteger consumedMessages) {
            this.messageBlockingQueue = messageBlockingQueue;
            this.consumersCount = consumersCount;
            this.consumedMessages = consumedMessages;
        }

        @Override
        public void run() {
            try {
                while (true) {
                    Message message = null;
                    message = messageBlockingQueue.take();

                    if (message instanceof PoisonPill) {
                        //we put back the poison pill so that to be consumed by the next consumer
                        messageBlockingQueue.put(message);
                        break;
                    } else {
                        consumedMessages.incrementAndGet();
                        System.out.println("Consumer got message " + message);
                    }
                }
            } catch (InterruptedException e) {
                throw new RuntimeException("Consumer interrupted", e);
            } finally {
                if (consumersCount.decrementAndGet() <= 0) {
                    System.out.println("Last consumer, clearing the queue");
                    messageBlockingQueue.clear();
                }
            }
        }
    }

    public static void main(String[] args) {

        final AtomicInteger producerCount = new AtomicInteger(4);
        final AtomicInteger consumersCount = new AtomicInteger(2);
        final AtomicInteger consumedMessages = new AtomicInteger();
        BlockingQueue<Message> messageBlockingQueue = new LinkedBlockingQueue<>();

        List<CompletableFuture<Void>> tasks = new ArrayList<>();
        for (int i = 0; i < producerCount.get(); i++) {
            tasks.add(CompletableFuture.runAsync(new Producer("" + (i + 1), messageBlockingQueue, producerCount)));
        }

        for (int i = 0; i < consumersCount.get(); i++) {
            tasks.add(CompletableFuture.runAsync(new Consumer(messageBlockingQueue, consumersCount, consumedMessages)));
        }

        CompletableFuture.allOf(tasks.toArray(new CompletableFuture[0])).join();

        System.out.println("Consumed " + consumedMessages + " messages");

    }
}

相关问题