/**
* Linked blocking queue with {@link #add(Object)} method, which adds only element, that is not already in the queue.
*/
public class SetBlockingQueue<T> extends LinkedBlockingQueue<T> {
private Set<T> set = Collections.newSetFromMap(new ConcurrentHashMap<>());
/**
* Add only element, that is not already enqueued.
* The method is synchronized, so that the duplicate elements can't get in during race condition.
* @param t object to put in
* @return true, if the queue was changed, false otherwise
*/
@Override
public synchronized boolean add(T t) {
if (set.contains(t)) {
return false;
} else {
set.add(t);
return super.add(t);
}
}
/**
* Takes the element from the queue.
* Note that no synchronization with {@link #add(Object)} is here, as we don't care about the element staying in the set longer needed.
* @return taken element
* @throws InterruptedException
*/
@Override
public T take() throws InterruptedException {
T t = super.take();
set.remove(t);
return t;
}
}
public class UniqueBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E> {
private final ReentrantLock lock = new ReentrantLock();
private final Condition notEmpty = lock.newCondition();
private final Condition notFull = lock.newCondition();
private HashSet<E> set = new HashSet<>();
private LinkedList<E> queue = new LinkedList<>();
private final int capacity;
private final AtomicInteger count = new AtomicInteger();
@Override
public void put(E e) throws InterruptedException {
Objects.requireNonNull(e);
final var lock = this.lock;
final var count = this.count;
lock.lockInterruptibly();
try {
while (count.get() == capacity || set.contains(e)) {
notFull.await();
}
queue.add(e);
set.add(e);
final var c = count.getAndIncrement();
if (c + 1 < capacity) {
notFull.signal();
}
if (c == 0) {
notEmpty.signal();
}
} finally {
lock.unlock();
}
}
@Override
public E take() throws InterruptedException {
final var lock = this.lock;
final var count = this.count;
lock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await();
}
var e = queue.poll();
set.remove(e);
final var c = count.getAndDecrement();
if (c > 1) {
notEmpty.signal();
}
if (c == capacity) {
notFull.signal();
}
return e;
} finally {
lock.unlock();
}
}
}
class BlockingSet extends ArrayBlockingQueue<E> {
/*Retain all other methods except put*/
public void put(E o) throws InterruptedException {
if (!this.contains(o)){
super.put(o);
}
}
}
6条答案
按热度按时间lstz6jyr1#
我写这个类是为了解决一个类似的问题:
soat7uwm2#
你可以创建一个新的类来组成一个BlockingQueue、一个Set和一个lock。当你使用put()时,你会在持有一个阻止get()运行的锁的同时对set进行测试。当你使用get()时,你会从set中移除该项,以便将来可以再次使用put()。
ctrmrzij3#
一种由链接散列集支持的阻塞队列实现,用于可预测的迭代顺序和恒定时间的添加、移除,并且包含操作:
There you go.
kgqe7b3p4#
我也在处理同样的问题,之前提出的解决方案有一些问题,比如集合和队列没有保持相同的元素。
我不能100%肯定这是否必要,但我猜是必要的。而且,它没有考虑其他方法,即要约和看跌期权。
如果我们使用'BlockingQueue'作为后备队列来保持同步,则相关方法都不起作用。内部队列的锁与监视器锁以及阻塞方法之间的交互冲突
最后,我必须基于jdk中已经存在的阻塞队列来实现类似的东西(在UniqueBlockingQueue中的完整实现和在UniqueBlockingQueueTest中的测试)。
pcrecxhr5#
你可以覆盖
BlockingQueue<T>
的任何实现的add和put方法,首先检查元素是否已经在队列中,例如。hzbexzde6#