如何实现线程安全收集器?

falq053o  于 2021-07-13  发布在  Java
关注(0)|答案(1)|浏览(337)

我想要类似的东西 Collectors.maxBy() ,获取集合中的顶级元素的收集器( maxBy 只有一个)。
我有一条小溪 Possibility 可以用一个 Integer score(Possibility) 方法。
首先我试着:

List<Possibity> possibilities = getPossibilityStream()
    .parallel()
    .collect(Collectors.toList());

if(!possibilities.isEmpty()) {
    int bestScore = possibilities.stream()
        .mapToInt(p -> score(p))
        .max()
        .getAsInt();
    possibilities = possibilities.stream()
        .filter(p -> score(p)==bestScore)
        .collect(Collectors.toList());
}

但这样做,我扫描了三次收集。一次构建它,第二次获得最高分数,第三次过滤它,这不是最佳的。此外,可能性的数量可能是巨大的(>1012)。
最好的方法应该是在第一次收集中直接获得最高的可能性,但是似乎没有内置的收集器来做这样的事情。
所以我实现了我自己的 Collector :

public class BestCollector<E> implements Collector<E, List<E>, List<E>> {

    private final Comparator<E> comparator;

    private final Class<? extends List> listImpl ;

    public BestCollector(Comparator<E> comparator, Class<? extends List> listImpl) {
        this.comparator = comparator;
        this.listImpl = listImpl;
    }

    public BestCollector(Comparator<E> comparator) {
        this.comparator= comparator;
        listImpl = ArrayList.class;
    }

    @Override
    public Supplier<List<E>> supplier() {
        return () -> {
            try {
                return listImpl.newInstance();
            } catch (InstantiationException | IllegalAccessException ex) {
                throw new RuntimeException(ex);
            }
        };
    }

    @Override
    public BiConsumer<List<E>, E> accumulator() {
        return (list, e) -> {
            if (list.isEmpty()) {
                list.add(e);
            } else {
                final int comparison = comparator.compare(list.get(0), e);
                if (comparison == 0) {
                    list.add(e);
                } else if (comparison < 0) {
                    list.clear();
                    list.add(e);
                }
            }
        };
    }

    @Override
    public BinaryOperator<List<E>> combiner() {
        return (l1, l2) -> {
            final int comparison = comparator.compare(l1.get(0), l2.get(0));
            if (comparison == 0) {
                l1.addAll(l2);
                return l1;
            } else if (comparison < 0) {
                return l2;
            } else {
                return l1;
            }
        };
    }

    @Override
    public Function<List<E>, List<E>> finisher() {
        return Function.identity();
    }

    @Override
    public Set<Characteristics> characteristics() {
        return EnumSet.of(Characteristics.IDENTITY_FINISH, Characteristics.CONCURRENT, Characteristics.UNORDERED);
    }
}

然后:

List<Possibity> possibilities = getPossibilityStream()
    .parallel()
    .collect(new BestCollector<Possibility>((p1, p2) -> score(p1).compareTo(score(p2)));

这是以顺序模式完成的(没有 .parallel() )但在并行模式中,偶尔有两个例外:
java.lang.IndexOutOfBoundsException Index: 0, Size: 0 在队列中:

final int comparison = comparator.compare(list.get(0), e);

accumulator() 方法
我知道当一个 list.clear() 被称为中间人 list.isEmpty() 以及 list.get(0) .
java.lang.NullPointerException 因为可能性是 null . 同样地,这条线也包括在内:

final int comparison = comparator.compare(list.get(0), e);

我不明白怎么做 list.get(0) 可能会回来 null ...
在并行模式下,有时 list.get(0) 提高 IndexOutOfBoundsException 有时还会回来 null .
我知道我的代码不是线程安全的,所以我尝试了几种解决方案:
添加 synchronized 在bestcollector的所有方法中: public synchronized … 使用线程安全集合而不是 ArrayList : java.util.concurrent.CopyOnWriteArrayList 添加 synchronized 使用 CopyOnWriteArrayList 同时
删除 Characteristics.CONCURRENT 在外面 Set<Characteristics>characteristics() 方法

@Override
public Set<Characteristics> characteristics() {
    return EnumSet.of(Characteristics.IDENTITY_FINISH, Characteristics.UNORDERED);
}

但我不知道 Characteristics.CONCURRENT 在这里表示我的代码是线程安全的,或者我的代码将用于并发处理。
但这些解决方案都没有真正解决问题。
事实上,当我去掉并发特征时,有时会有一个 java.lang.IndexOutOfBoundsException: Index: 0, Size: 0 但话说回来:

final int comparison = comparator.compare(l1.get(0), l2.get(0));

combiner() 方法。
然而,政府提出的例外情况 accumulator() 方法似乎不再出现了。
@霍尔格的回答是对的。
完整的解决方案是改变两者 combiner() 以及 characteristics() 方法:

@Override
public BinaryOperator<List<E>> combiner() {
    return (l1, l2) -> {
        if (l1.isEmpty()) {
            return l2;
        } else if (l2.isEmpty()) {
            return l1;
        } else {
            final int comparison = comparator.compare(l1.get(0), l2.get(0));
            if (comparison == 0) {
                l1.addAll(l2);
                return l1;
            } else if (comparison < 0) {
                return l2;
            } else {
                return l1;
            }
        }
    };
}

@Override
public Set<Characteristics> characteristics() {
    return EnumSet.of(Characteristics.IDENTITY_FINISH, Characteristics.UNORDERED);
}
jtw3ybtb

jtw3ybtb1#

您的代码只有一个重大错误:如果收集器不是线程安全的,则不应报告 Characteristics.CONCURRENT 因为这正是声称它是线程安全的。
你必须明白的一点是,对于非- CONCURRENT 在收集器中,框架将执行必要的步骤,以线程安全但仍然有效的方式使用它:
对于每个工作线程,将通过 supplier() 每个工人将使用 accumulator() 与自己的本地容器一起运行
这个 combiner() 将在两个工作线程完成工作后使用
这个 finisher() 将在所有工作线程完成其工作并且合并了所有容器时使用
因此,您所要做的就是确保您的供应商在每次调用时真正返回一个新示例,并且所有函数都是无干扰和无副作用的(除了作为参数接收的容器之外的任何其他函数),当然,不报告 Characteristics.CONCURRENT 当您的收集器不是并发收集器时。
你不需要 synchronized 关键字或并发集合。
顺便说一下,一个 Comparator 窗体的 (p1, p2) -> score(p1).compareTo(score(p2)) 可以使用 Comparator.comparing(p -> score(p)) 或者如果得分值是 int : Comparator.comparingInt(p -> score(p)) .
最后,combiner函数不会检查其中一个列表是否为空。这很好地解释了 IndexOutOfBoundsExceptioncombinerIndexOutOfBoundsExceptionaccumulator 是你的收集者报告的结果吗 Characteristics.CONCURRENT
了解添加 synchronized 关键字到 accumulator() 或者 combiner() 方法不保护通过lambda表达式构造的函数。它将保护构造函数示例的方法,而不是函数的代码本身。与内部类不同的是,在 synchronized 关键字到实际函数的实现方法。

相关问题