java阻塞队列递增计数问题

e4eetjau  于 2021-07-04  发布在  Java
关注(0)|答案(1)|浏览(357)

我有3个生产者线程和2个消费者线程。问题是输出/控制台中的计数。每个线程只计算他们制作的汉堡包(burger),我如何才能得到想要的输出值?我认为问题出在价值观上的生产者阶层。我想用阻塞队列来解决这个问题。

Output from console:
Kokk3 legger på hamburger (1) >> [1]
Kokk1 legger på hamburger (1) >> [1]
Servitør1 tar av hamburger (1) >> [1]
Kokk2 legger på hamburger (1) >> [1, 1]
Servitør2 tar av hamburger (1) >> [1]
Servitør1 tar av hamburger (1) >> []
Kokk2 legger på hamburger (2) >> [2]
Kokk1 legger på hamburger (2) >> [2, 2]
Kokk3 legger på hamburger (2) >> [2, 2, 2]
Kokk1 legger på hamburger (3) >> [2, 2, 2, 3]
Servitør1 tar av hamburger (2) >> [2, 2, 3]
Servitør2 tar av hamburger (2) >> [2, 3]

Wanted output instead:
Kokk3 legger på hamburger (1) >> [1]
Kokk1 legger på hamburger (2) >> [1, 2]
Servitør1 tar av hamburger (1) >> [2]
Kokk2 legger på hamburger (3) >> [2, 3]
Servitør2 tar av hamburger (2) >> [3]
Servitør1 tar av hamburger (3) >> []
Kokk2 legger på hamburger (4) >> [4]
Kokk1 legger på hamburger (5) >> [4, 5]
Kokk3 legger på hamburger (6) >> [4, 5, 6]
Kokk1 legger på hamburger (7) >> [4, 5, 6, 7]
Servitør1 tar av hamburger (4) >> [5, 6, 7]
Servitør2 tar av hamburger (5) >> [6, 7]
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class Main {

    public static void main(String[] args) {

        BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(5);

       //Oppretter trådene
       Thread kokk1= new Thread(new Producer(queue), "Kokk1");
       Thread kokk2=new Thread(new Producer(queue), "Kokk2");
       Thread kokk3=new Thread(new Producer(queue), "Kokk3");

        Thread servitoer1=new Thread(new Consumer(queue), "Servitør1");
        Thread servitoer2=new Thread(new Consumer(queue), "Servitør2");             

        //Starter trådene
        kokk1.start();
        kokk2.start();
        kokk3.start();

        servitoer1.start();
        servitoer2.start();
    }
}
import java.util.Random;
import java.util.concurrent.BlockingQueue;

public class Producer implements Runnable {

    private final BlockingQueue<Integer> queue;
    int value=1;   

    public Producer(BlockingQueue<Integer> queue) {
        this.queue = queue; 
    }

    @Override
    public void run() {
        try {
            process();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    private void process() throws InterruptedException {    

        while (true) {
            queue.put(value);
            System.out.println(Thread.currentThread().getName()+" legger på hamburger " + "("+value+")"+" >> "+queue);
            value++;
            Thread.sleep(vente());
        }
    }

  //wait between 2 and 6 seconds
    public int vente() {
        Random r=new Random ();
        int min=2000;
        int max=6000;
        int result=r.nextInt(max-min) + min;
        return result;
    }

}
import java.util.Random;
import java.util.concurrent.BlockingQueue;

public class Consumer implements Runnable {

    private final BlockingQueue<Integer> queue;

    public Consumer(BlockingQueue<Integer> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {

        try {
            while (true) {
                Integer take = queue.take();
                process(take);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    private void process(Integer take) throws InterruptedException {
        System.out.println(Thread.currentThread().getName()+" tar av hamburger " + "("+take+")"+" >> "+queue);
        Thread.sleep(vente());
    }

    //wait between 2 and 6 seconds
    public int vente() {
        Random r=new Random ();
        int min=2000;
        int max=6000;
        int result=r.nextInt(max-min) + min;
        return result;
    }
}
omhiaaxx

omhiaaxx1#

你宣布 Producer 的值为 int value=1; ,所以每个 Producer 正在保存自己的 value . 你想要的结果表明你想要所有 Producer 所以我们分享一个 value .
在单线程应用程序中,这很简单,只需将声明更改为 static int value = 1; ,但这样做会引入竞争条件。
最简单的方法是使用 AtomicInteger . 一 AtomicInteger 保证一次只有一个线程可以读/写变量,这消除了数据不一致性。此外, AtomicInteger 还可以保证每个线程的可见性,因此您不必担心 volatile -城市。
编辑:如果您想要一个有保证的插入顺序,您可以将原子操作放在enqueue操作中( queue.put(value.getAndIncrement()) ),但这不允许您访问为日志记录插入的元素。为此,您需要以原子方式执行所有三个步骤(获取/递增、排队和打印),这需要一个静态同步方法。
您可能需要重构 queue 作为一个静态变量,使之工作,或添加一个静态变量 Object 在同步块上充当锁,而不是创建同步方法。

import java.util.concurrent.atomic.AtomicInteger;

public class Producer implements Runnable {

    private final static BlockingQueue<Integer> queue;
    private final static AtomicInteger value = new AtomicInteger(1);

    // Other things...

    private void process() throws InterruptedException {    

        while (true) {
            insertValue();
            Thread.sleep(vente());
        }
    }

    private static synchronized void insertValue() {
        int insertValue = value.getAndIncrement(); // Gets value then increments it before another thread can access
        queue.put(insertValue);
        System.out.println(Thread.currentThread().getName()+" legger på hamburger " + "("+insertValue+")"+" >> "+queue);
    }

    // Other things...
}

相关问题