flink在hashmap中保留所有流元素

a6b3iqyw  于 2021-06-24  发布在  Flink
关注(0)|答案(2)|浏览(349)

我正在尝试在flink job的窗口函数中使用hashmap。所有并行运算符中的所有元素能否存储在一个运算符上的hashmap中?

public class SeewoUserWindowFunction implements WindowFunction<ObjectNode, LabelInfo, String, TimeWindow> {

    private static final Logger logger = LoggerFactory.getLogger(SeewoUserWindowFunction.class);
    @Override
    public void apply(String s, TimeWindow timeWindow, Iterable<ObjectNode> iterable, Collector<LabelInfo> collector) throws Exception {
        try {
            HashMap<String, LabelInfo> result = new HashMap<>();
            iterable.forEach(e -> {
                    String key = e.get("value").get("$tid").toString() + "/" + e.get("value").get("$code").toString();
                    if (result.containsKey(key)) {
                        result.put(key, result.get(key).update(e, timeWindow.getEnd()));
                    } else {
                        result.put(key, LabelInfo.of(e, timeWindow.getEnd()));
                    }
            });
            result.values().stream().forEach(labelInfo -> collector.collect(labelInfo));
        } catch (Exception exception) {
            logger.error("parse exception!", exception);
        }
    }
}
w1e3prcc

w1e3prcc1#

在您的例子中,每个并行运算符都只保留自己的 HashMap ,但这在很大程度上取决于流的分区。这里有一个类似的问题来解释操作员之间的通信。如果您出于某种原因希望能够保留流中的所有元素 HashMap 使用 parallelism > 1 . 你可以打电话 global() 在您的数据流上,这将导致流的所有元素只经过并行运算符的一个示例,这将基本上允许您将所有流元素存储在 HashMap ,但请记住,这可能会在吞吐量和延迟方面产生可怕的后果。

iq3niunx

iq3niunx2#

你可以用 org.apache.flink.streaming.api.datastream.DataStream#windowAll 方法将所有元素聚集到全局窗口中。
请参阅此文档。

相关问题