我正与生产者和消费者的问题作斗争。当我运行一个生产者和多个消费者时,问题就存在了。只有一个使用者使用缓冲区。据我所知,这可能是我的缓冲区实现的问题。我怎样才能解决这个问题?如果不是缓冲我做错了什么?
class Producer extends Thread {
private final Buffer _buf;
private final int maxSize;
private final String name;
public Producer(Buffer _buf, int maxSize, String name) {
super(name);
this._buf = _buf;
this.maxSize = maxSize;
this.name = name;
}
@Override
public void run() {
synchronized (_buf) {
for (; ; ) {
while (_buf.isFull()) {
try {
System.out.println("Buffer is full, " + "Producer thread waiting for " + "consumer to take something from buffer");
_buf.wait();
} catch (Exception ex) {
ex.printStackTrace();
}
}
Random random = new Random();
int i = random.nextInt();
System.out.println(this.name + " producing value " + i);
_buf.put(i);
_buf.notifyAll();
try {
Thread.sleep(200);
} catch (InterruptedException exception) {
exception.printStackTrace();
}
}
}
}
}
class Consumer extends Thread {
private final Buffer _buf;
private final int maxSize;
private final String name;
public Consumer(Buffer _buf, int maxSize, String name) {
super(name);
this._buf = _buf;
this.maxSize = maxSize;
this.name = name;
}
@Override
public void run() {
synchronized (_buf) {
for (; ; ) {
while (_buf.isEmpty()) {
System.out.println("Buffer is empty," + "Consumer thread is waiting" + " for producer thread to put something in buffer");
try {
_buf.wait();
} catch (Exception ex) {
ex.printStackTrace();
}
}
System.out.println(this.name + ": consuming value " + _buf.get());
try {
Thread.sleep(1000);
} catch (InterruptedException exception) {
exception.printStackTrace();
}
_buf.notifyAll();
}
}
}
}
class Buffer {
public synchronized void put(int i) {
// check for queue overflow
if (isFull()) {
System.out.println("Overflow\nProgram Terminated");
System.exit(1);
}
System.out.println("Inserting " + i);
rear = (rear + 1) % capacity;
arr[rear] = i;
count++;
notifyAll();
}
public synchronized int get() {
if (isEmpty()) {
System.out.println("Underflow\nProgram Terminated");
System.exit(1);
}
int result = arr[front];
System.out.println("Removing " + arr[front]);
front = (front + 1) % capacity;
count--;
notifyAll();
return result;
}
private final int[] arr; // array to store queue elements
private int front; // front points to the front element in the queue
private int rear; // rear points to the last element in the queue
private final int capacity; // maximum capacity of the queue
private int count; // current size of the queue
// Constructor to initialize a buffer queue
Buffer(int size) {
arr = new int[size];
capacity = size;
front = 0;
rear = -1;
count = 0;
}
public int size() {
return count;
}
public Boolean isEmpty() {
return (size() == 0);
}
public Boolean isFull() {
return (size() == capacity);
}
}
public class PKmain {
public static void main(String[] args) {
int maxSize = 100;
Buffer buffer = new Buffer(10);
Thread producer = new Producer(buffer, maxSize, "PRODUCER");
Thread consumer1 = new Consumer(buffer, maxSize, "CONSUMER 1");
Thread consumer2 = new Consumer(buffer, maxSize, "CONSUMER 2");
Thread consumer3 = new Consumer(buffer, maxSize, "CONSUMER 3");
Thread consumer4 = new Consumer(buffer, maxSize, "CONSUMER 4");
Thread consumer5 = new Consumer(buffer, maxSize, "CONSUMER 5");
Thread consumer6 = new Consumer(buffer, maxSize, "CONSUMER 6");
producer.start();
consumer1.start();
consumer2.start();
consumer3.start();
consumer4.start();
consumer5.start();
consumer6.start();
}
}
下面是控制台输出:
Inserting -1893944
PRODUCER producing value 1150242252
Inserting 1150242252
PRODUCER producing value 957139043
Inserting 957139043
PRODUCER producing value -806406909
Inserting -806406909
PRODUCER producing value 1701947892
Inserting 1701947892
PRODUCER producing value -174867893
Inserting -174867893
PRODUCER producing value 1272708996
Inserting 1272708996
PRODUCER producing value -1522880833
Inserting -1522880833
PRODUCER producing value -1041643777
Inserting -1041643777
PRODUCER producing value 1741137093
Inserting 1741137093
Buffer is full, Producer thread waiting for consumer to take something from buffer
Removing -1893944
CONSUMER 6: consuming value -1893944
Removing 1150242252
CONSUMER 6: consuming value 1150242252
Removing 957139043
CONSUMER 6: consuming value 957139043
Removing -806406909
CONSUMER 6: consuming value -806406909
Removing 1701947892
CONSUMER 6: consuming value 1701947892
Removing -174867893
CONSUMER 6: consuming value -174867893
Removing 1272708996
CONSUMER 6: consuming value 1272708996
Removing -1522880833
CONSUMER 6: consuming value -1522880833
Removing -1041643777
CONSUMER 6: consuming value -1041643777
Removing 1741137093
CONSUMER 6: consuming value 1741137093
Buffer is empty,Consumer thread is waiting for producer thread to put something in buffer
Buffer is empty,Consumer thread is waiting for producer thread to put something in buffer
Buffer is empty,Consumer thread is waiting for producer thread to put something in buffer
Buffer is empty,Consumer thread is waiting for producer thread to put something in buffer
Buffer is empty,Consumer thread is waiting for producer thread to put something in buffer
Buffer is empty,Consumer thread is waiting for producer thread to put something in buffer
PRODUCER producing value -1656771306
Inserting -1656771306
PRODUCER producing value 146381233
Inserting 146381233
PRODUCER producing value -303301670
Inserting -303301670
...
谢谢!!!
2条答案
按热度按时间vecaoik11#
消费者不能并行处理缓冲区的原因是
synchronized (_buf)
部分。因此,虽然锁是由单个使用者获取的,但其他使用者都无法处理。作为一种解决方案,我建议在缓冲区操作(也存在于您的代码中)中的锁定方面,消除使用者和生产者端的锁定。代码可以如下所示:生产商部分:
消费者部分:
缓冲部分:
2w3kk1z52#
只有一个使用者使用缓冲区。据我所知,这可能是我的缓冲区实现的问题。我怎样才能解决这个问题?
对于要同时处理缓冲区的多个使用者,您需要释放对的锁定
_buf
. 在您的消费代码中,您正在模拟睡眠工作。睡眠不能在身体内部synchronized
块:在你的生产者代码中,你也睡在
synchronized
阻止。你应该把锁关上,然后睡在外面,然后再进入房间synchronized
阻止。要修复消费者,您应该更改消费者代码,以便有2个
synchronized
块:一个等待并获取缓冲区,另一个通知其他人缓冲区可用。比如:
如果我们看看
Buffer
,任何看它的状态的东西都需要synchronized
. 这意味着size()
,isEmpty()
,和isFull()
还需要synchronized
.当我用这些修改运行代码时,我看到:
其他评论:
别接住
Exception
,接住InterruptedException
一定要做一个Thread.currentThread().interrupt()
捕捉后重新中断线程。小心使用
System.out.println(...)
因为它是一个可以改变线程计时的同步调用。这个
Buffer
is方法应返回boolean
而不是Boolean
.我想这是个练习。如果你真的这么做了,我建议你用
BlockingQueue
它负责在线程之间共享数据,并为您锁定和发送信号。