我正在尝试在flink job的窗口函数中使用hashmap。所有并行运算符中的所有元素能否存储在一个运算符上的hashmap中?
public class SeewoUserWindowFunction implements WindowFunction<ObjectNode, LabelInfo, String, TimeWindow> {
private static final Logger logger = LoggerFactory.getLogger(SeewoUserWindowFunction.class);
@Override
public void apply(String s, TimeWindow timeWindow, Iterable<ObjectNode> iterable, Collector<LabelInfo> collector) throws Exception {
try {
HashMap<String, LabelInfo> result = new HashMap<>();
iterable.forEach(e -> {
String key = e.get("value").get("$tid").toString() + "/" + e.get("value").get("$code").toString();
if (result.containsKey(key)) {
result.put(key, result.get(key).update(e, timeWindow.getEnd()));
} else {
result.put(key, LabelInfo.of(e, timeWindow.getEnd()));
}
});
result.values().stream().forEach(labelInfo -> collector.collect(labelInfo));
} catch (Exception exception) {
logger.error("parse exception!", exception);
}
}
}
2条答案
按热度按时间w1e3prcc1#
在您的例子中,每个并行运算符都只保留自己的
HashMap
,但这在很大程度上取决于流的分区。这里有一个类似的问题来解释操作员之间的通信。如果您出于某种原因希望能够保留流中的所有元素HashMap
使用parallelism > 1
. 你可以打电话global()
在您的数据流上,这将导致流的所有元素只经过并行运算符的一个示例,这将基本上允许您将所有流元素存储在HashMap
,但请记住,这可能会在吞吐量和延迟方面产生可怕的后果。iq3niunx2#
你可以用
org.apache.flink.streaming.api.datastream.DataStream#windowAll
方法将所有元素聚集到全局窗口中。请参阅此文档。