仅包含唯一元素的Java阻塞队列

ccgok5k5  于 2023-03-06  发布在  Java
关注(0)|答案(6)|浏览(146)

有点像一个“阻塞集”。我如何实现一个阻塞队列,其中添加一个已经在集中的成员被忽略?

lstz6jyr

lstz6jyr1#

我写这个类是为了解决一个类似的问题:

/**
 * Linked blocking queue with {@link #add(Object)} method, which adds only element, that is not already in the queue.
 */
public class SetBlockingQueue<T> extends LinkedBlockingQueue<T> {

    private Set<T> set = Collections.newSetFromMap(new ConcurrentHashMap<>());

    /**
     * Add only element, that is not already enqueued.
     * The method is synchronized, so that the duplicate elements can't get in during race condition.
     * @param t object to put in
     * @return true, if the queue was changed, false otherwise
     */
    @Override
    public synchronized boolean add(T t) {
        if (set.contains(t)) {
            return false;
        } else {
            set.add(t);
            return super.add(t);
        }
    }

    /**
     * Takes the element from the queue.
     * Note that no synchronization with {@link #add(Object)} is here, as we don't care about the element staying in the set longer needed.
     * @return taken element
     * @throws InterruptedException
     */
    @Override
    public T take() throws InterruptedException {
        T t = super.take();
        set.remove(t);
        return t;
    }
}
soat7uwm

soat7uwm2#

你可以创建一个新的类来组成一个BlockingQueue、一个Set和一个lock。当你使用put()时,你会在持有一个阻止get()运行的锁的同时对set进行测试。当你使用get()时,你会从set中移除该项,以便将来可以再次使用put()。

ctrmrzij

ctrmrzij3#

一种由链接散列集支持的阻塞队列实现,用于可预测的迭代顺序和恒定时间的添加、移除,并且包含操作:
There you go.

kgqe7b3p

kgqe7b3p4#

我也在处理同样的问题,之前提出的解决方案有一些问题,比如集合和队列没有保持相同的元素。
我不能100%肯定这是否必要,但我猜是必要的。而且,它没有考虑其他方法,即要约和看跌期权。
如果我们使用'BlockingQueue'作为后备队列来保持同步,则相关方法都不起作用。内部队列的锁与监视器锁以及阻塞方法之间的交互冲突
最后,我必须基于jdk中已经存在的阻塞队列来实现类似的东西(在UniqueBlockingQueue中的完整实现和在UniqueBlockingQueueTest中的测试)。

public class UniqueBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E> {

  private final ReentrantLock lock = new ReentrantLock();
  private final Condition notEmpty = lock.newCondition();
  private final Condition notFull = lock.newCondition();

  private HashSet<E> set = new HashSet<>();
  private LinkedList<E> queue = new LinkedList<>();

  private final int capacity;
  private final AtomicInteger count = new AtomicInteger();

  @Override
  public void put(E e) throws InterruptedException {
    Objects.requireNonNull(e);

    final var lock = this.lock;
    final var count = this.count;
    lock.lockInterruptibly();
    try {
        while (count.get() == capacity || set.contains(e)) {
            notFull.await();
        }
        queue.add(e);
        set.add(e);

        final var c = count.getAndIncrement();
        if (c + 1 < capacity) {
            notFull.signal();
        }
        if (c == 0) {
            notEmpty.signal();
        }
    } finally {
        lock.unlock();
    }
  }

  @Override
  public E take() throws InterruptedException {
    final var lock = this.lock;
    final var count = this.count;
    lock.lockInterruptibly();
    try {
        while (count.get() == 0) {
            notEmpty.await();
        }
        var e = queue.poll();
        set.remove(e);

        final var c = count.getAndDecrement();
        if (c > 1) {
            notEmpty.signal();
        }
        if (c == capacity) {
            notFull.signal();
        }
        return e;
   } finally {
       lock.unlock();
   }
  }
}
pcrecxhr

pcrecxhr5#

你可以覆盖BlockingQueue<T>的任何实现的add和put方法,首先检查元素是否已经在队列中,例如。

@Override
public boolean add(T elem) {
    if (contains(elem))
        return true;
    return super.add(elem);
}
hzbexzde

hzbexzde6#

class BlockingSet extends ArrayBlockingQueue<E> {
   /*Retain all other methods except put*/
   public void put(E o) throws InterruptedException {
      if (!this.contains(o)){
         super.put(o);
      }
   }
}

相关问题