所以我有一个阻塞队列实现。其中一个调度程序将一个随机数以1秒的延迟放入队列,我已经实现了另一个调度程序,其中包含10个线程池,用于从消息队列调用take()。
重要的一点是,我正在尝试实现的场景是,在从队列中获取单个项目之后,线程等待20秒(线程睡眠),而我的理解是,线程池中的其他9个线程将开始并行工作,而第一个线程等待20秒(其他线程也将等待20秒)但事实并非如此。池中的其他线程似乎根本没有启动。我是一个新手封锁队列和任何帮助将不胜感激。
我的代码如下。
公共类blockingqueueimpl{
public Queue<Integer> messageQueue = new ConcurrentLinkedDeque();
private void putNumber(Integer number){
try{
System.out.println("putting number to the queue: " + number);
messageQueue.add(number);
System.out.println("size of the queue: " +messageQueue.size());
} catch (Exception e){
e.printStackTrace();
}
}
private void getNumber(){
}
private class RunnableGetImpl implements Runnable {
@Override
public void run() {
try{
Integer num = messageQueue.poll();
System.out.println("Polling from queue, number - "+ num);
if(num!=null){
System.out.println("Sleeping thread for 20 sec"+Thread.activeCount());
Thread.sleep(20000);
}
}catch (Exception e){
e.printStackTrace();
}
}
}
private class RunnablePutImpl implements Runnable {
@Override
public void run() {
Random rand = new Random();
int n = rand.nextInt(100);
n += 1;
putNumber(n);
}
}
public static void main(String[] args){
BlockingQueueImpl blockingQueue = new BlockingQueueImpl();
ScheduledExecutorService executor1 = Executors.newScheduledThreadPool(1);
executor1.scheduleAtFixedRate(blockingQueue.new RunnablePutImpl(), 0, 1000, TimeUnit.MILLISECONDS);
ScheduledExecutorService executor2 = Executors.newScheduledThreadPool(20);
executor2.scheduleAtFixedRate(blockingQueue.new RunnableGetImpl(), 0, 100, TimeUnit.MILLISECONDS);
}
}
1条答案
按热度按时间ndasle7k1#
来自
ScheduledThreadPoolExecutor.scheduleAtFixedRate
:如果此任务的任何执行时间长于其周期,则后续执行可能会延迟开始,但不会同时执行。
因此,你需要开始(安排)尽可能多的工人。
为了寻找更好的解决方案,请注意,您实际上并没有使用
BlockingQueue
你不执行java.util.concurrent.Blockingqueue
,也没有使用它的实现。ConcurrentLinkedDeque
只是一个集合,它甚至没有实现Queue
.ConcurrentLinkedDeque.poll()
不会阻塞,只会返回null
如果队列为空。这些是
BlockingQueue
接口:https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/blockingqueue.html使用
put()
向队列提供值。如果BlockingQueue
已达到最大容量。使用take()
删除元素。如果队列为空,这将阻塞。正确使用这些类将提高应用程序的性能,因为您不会一直轮询某个值。
关于一个类似问题的答案中提供了更多细节:如何使用concurrentlinkedqueue?
更新:具有多个生产者/消费者的示例代码
下面的示例代码是从https://riptutorial.com/java/example/13011/multiple-producer-consumer-example-with-shared-global-queue 与我没有任何联系:
下面的代码展示了多个生产者/消费者程序。生产者线程和使用者线程共享相同的全局队列。
输出:
请注意如何将多个生产者和消费者添加到池中—您需要尽可能多的生产者和消费者才能并行工作。这是您的代码缺少的关键—多个工人。调度器将对它们进行调度,但它不会神奇地将您要求它调度的单个示例相乘。
很明显,您需要根据您的需求调整生产商和消费者的数量。