我想知道flink的datastreamapi是否可以用来从传入的记录(可能是在特定的时间窗口内)中删除重复项,就像datasetapi提供了一个称为“distinct”的转换一样。或者无论如何,如果数据集可以转换为datastream,假设数据集在flink中转换为datastream进行内部处理。请帮帮我。提前谢谢!干杯!
rqdpfwrv1#
我不知道任何内置原语,但是如果窗口中的所有数据都适合内存,那么您可以自己轻松地构建这个函数。
DataStream<...> stream = ... stream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))) .process(new DistinctFunction<>()); public class DistinctFunction<T, W extends Window> extends ProcessAllWindowFunction<T, T, W> implements Function { public void process(final Context context, Iterable<T> input, Collector<R> out) throws Exception { Set<T> elements = new HashSet<>(); input.forEach(elements::add); elements.forEach(out::collect); } }
当然,如果你有一个键的话,它的可伸缩性要大得多,因为只有窗口中一个键的数据需要保存在内存中。
1条答案
按热度按时间rqdpfwrv1#
我不知道任何内置原语,但是如果窗口中的所有数据都适合内存,那么您可以自己轻松地构建这个函数。
当然,如果你有一个键的话,它的可伸缩性要大得多,因为只有窗口中一个键的数据需要保存在内存中。