我试图利用并行性来加速前10个窗口操作。我的应用程序由具有时间戳和密钥的事件和(即。, Tuple2<Long,String>
)我的目标是为30分钟的滚动窗口(使用事件时间)生成前10个最频繁的键。为此,我的查询由入口、窗口和聚合阶段组成。换句话说,我的代码需要执行以下操作:
DataStream<Tuple3<Long, String, Integer>> s = env
.readTextFile("data.csv")
.map(new MapFunction<String, Tuple3<Long, String, Integer>>() {
@Override
public Tuple3<Long, String, Integer> map(String s) throws Exception {
String[] tokens = s.split(",");
return new Tuple3<Long, String, Integer>(Long.parseLong(tokens[0]),
tokens[1], 1);
}})
.assignTimestampsAndWatermarks(
new AscendingTimestampExtractor<Tuple3<Long, String, Integer>>() {
@Override
public long extractAscendingTimestamp(Tuple3<Long, String, Integer> t) {
return t.f0;
}}).setParallelism(1);
上面是解析csv文件中的数据并分配事件时间(即入口)的代码。之所以将parallelism设置为1,是因为我需要事件按顺序显示,以便将它们分配给windows。
接下来是棘手的部分,我尝试在生成正确(有序)的窗口结果的同时加快执行速度。
原始(串行)执行
以下代码提供了一个不使用任何并行性并生成串行流的解决方案:
DataStream<Tuple2<Long, String>> windowedTopTen = s
.windowAll(TumblingEventTimeWindows.of(Time.minutes(30)))
.apply(new SerialAggregation()).setParallelism(1);
哪里 SerialAggregation
延伸 RichAllWindowFunction<Tuple3<Long, String, Integer>, Tuple2<Long, String>, TimeWindow>
每扇翻滚的Windows Tuple2<Long, String>
( Long
是时间戳和 String
包含前10个键)。
naive方法生成正确的结果,结果数据流按时间戳升序排序。不幸的是,它没有利用多线程,因此当输入数据是一些gbs时,执行需要一段时间才能完成。
并行(更快)进近
在查看了flink在windows上的文档之后,我试图通过使用 parallelism > 1
同时为每个窗口生成正确的结果。因此,我看到我需要转变 s
到 KeyedStream
然后应用 window()
转变。本质上:
DataStream<Tuple2<Long, String>> windowedTopTen = s
.keyBy(1)
.window(TumblingEventTimeWindows.of(Time.minutes(30)))
.apply(new PartialAggregation()).setParallelism(N);
哪里 PartialAggregation()
将为不同的时间戳生成部分结果(不相交的键集)。换句话说,我的理解是,对于相同的时间戳 t1
我会以 partial_result_1
至 partial_result_N
哪里 N
是我设定的平行度。我的目标是聚合特定时间戳的所有部分结果(如 t1
),但我不知道怎么做。另外,当我能够将部分结果与匹配的时间戳相结合时,我将如何生成一个数据流,它的元组是基于时间戳排序的(就像原始解决方案生成的结果一样)。
问题
如何完成并行(更快)方法以生成所需的结果并将部分结果与匹配的时间戳相结合?
在我为每个时间戳合并部分结果之后,有没有一种方法可以生成一个数据流,其中的结果根据时间戳排序?
1条答案
按热度按时间h43kikqp1#
首先,如果将tuple2替换为tuple3,其中字符串是单个键,整数是计数器,那么将部分前10个结果合并到整体前10个结果会更容易。
然后,您可以使用windowall和聚集窗口函数添加第二层窗口,该函数保留前10个键(总键数)及其计数。