这个实验特性应该是在Flink 1.5版本已经引进,但是直到现在(1.11)仍然是实验特性。官网对于它的描述 :这个特性仍然在不断的优化,目前是可能是不稳定、不兼容的,并且在以后的版本甚至发生大的改变。
将DataStream重新解释为KeyedStream,这种方式可以避免shuffle
。
那么自然它的使用也会受到相应的约束,这个只能去重新解释那些已经预分区的DataStream。
在源码中找到了这样一个测试代码,结果是:Tests passed
public class ReinterpretAsKeyedStreamDemo {
public void reinterpretAsKeyedStream() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<Integer> source = env.fromElements(1, 2, 3, 1, 2, 3, 1, 2, 3);
KeyedStream<Integer, Integer> reinterpret = DataStreamUtils.reinterpretAsKeyedStream(source, new KeySelector<Integer, Integer>() {
@Override
public Integer getKey(Integer value) throws Exception {
return value;
}
});
SingleOutputStreamOperator<Integer> reducer = reinterpret.countWindow(2)
.reduce(new ReduceFunction<Integer>() {
@Override
public Integer reduce(Integer value1, Integer value2) throws Exception {
return value1 + value2;
}
});
reducer.addSink(new PrintSinkFunction<>());
env.execute("xx");
}
}
上面结果我们可以看到 输出了2 4 6 其实就是
但是我们其实有9条数据,1,2,3分别是3组数据,为什么少输出呢?
因为前面两组1,2,3已经结束了一个窗口,满足同一个key下有两个数据,然后最后一组的1,2,3,并不满足有两个数据,无法触发窗口。
为了方便理解我们再次修改如下代码:
public void reinterpretAsKeyedStream() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<Integer> source = env.fromElements(1, 2, 3, 1, 2, 3, 1, 2, 3,2);
KeyedStream<Integer, Integer> reinterpret = DataStreamUtils.reinterpretAsKeyedStream(source, new KeySelector<Integer, Integer>() {
@Override
public Integer getKey(Integer value) throws Exception {
return value;
}
});
SingleOutputStreamOperator<Integer> reducer = reinterpret.countWindow(2)
.reduce(new ReduceFunction<Integer>() {
@Override
public Integer reduce(Integer value1, Integer value2) throws Exception {
return value1 + value2;
}
});
reducer.addSink(new PrintSinkFunction<>());
env.execute("xx");
}
运行结果如下
2
4
6
4
我们数据源数据里面最后加入了一个2,然后最后输出多了一个4。当然这个是countwindow的使用,因为官网例子给的不明确,这里只是简单给大家补充一下,便于理解,避免初次使用产生太多疑问。
代码功能:
ds1:DataStream[Event
]流,然后输出文件数据;ds2:DataStreamp[Event]
;最后我们继续使用一个窗口大小为2的CountWindow,然后对窗口内两条数据处理:
如果两条数据的event_type字段值不等,那么我们使用第一条数据的值去创建一个Event对象,然后新数据Event对象的event_type字段设置为3,并且把字段 v 设置为两个数据的字段 v 的字符串拼接;
如果event_type字段值相等,那么我们保留time_字段值大的一条数据。
第三步中,正常情况我们会对ds2进行keyby然后继续按照key 字段值hash,这样会产生相应的Shuffle,但是通过使用本文的实验特性reinterpretAsKeyedStream,可以避免Shuffle。
// scala
object SessionwindowingOriginal {
// 主函数
def main(args: Array[String]): Unit = {
Logger.getRootLogger.setLevel(Level.WARN)
val params = ParameterTool.fromArgs(args)
val env = StreamExecutionEnvironment.createLocalEnvironment(2)
env.getConfig.setGlobalJobParameters(params)
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
env.setParallelism(2)
env.setMaxParallelism(2)
// 从文件读取数据 ds1:DataStream[Event]类型
val ds1 = env.readTextFile("/Users/hehuiyuan/gitwarehouse/flinksql/src/main/resources/f1").map(e => {
val l = e.split(",")
val (key, time_, event_type, v, partition) = (l(0).trim, l(1).trim.toLong, l(2).trim.toInt, l(3).trim, l(4).trim)
Event(key, time_, event_type, v, partition)
}).name("f1_source")
//输出原始数据
ds1.addSink(new SinkFunction[Event] {
override def invoke(value: Event, context: SinkFunction.Context[_]): Unit = System.out.println("原始数据:"+value.toString)
}).name("origin_data_sink")
//按照event对象的key字段分组
// 相同key下 窗口大小数据量是2,然后取partition字段取最大的数据
val ds2 = ds1.keyBy(_.key).countWindow(2).max("partition")
// 输出ds2:DataStream[Event]
ds2.addSink(new SinkFunction[Event] {
override def invoke(value: Event, context: SinkFunction.Context[_]): Unit = System.out.println("ds2:"+value.toString)
}).name("ds2")
//ds2是DataSteam,ds1按照字段key分区处理后得到的流
//此时还想继续使用KededStream的一些操作,需要把ds2进行keyby
// 但是会存在shuffle,key不变情况下,我们可以直接把DataStream变为KeyedStream
val aggregated = new DataStreamUtils(ds2)
.reinterpretAsKeyedStream((event) => event.key)
.countWindow(2)
.reduce((e1, e2) =>
if(e1.event_type != e2.event_type)
Event(e1.key,e1.time_,3,e2.v+"_"+e1.v,e1.partition)
else if(e2.time_ > e1.time_) e2
else e1
)
.addSink(new SinkFunction[Event] {
override def invoke(value: Event, context: SinkFunction.Context[_]): Unit =
System.out.println(value.toString)
}).name("result")
env.execute()
}
}
我们看一下读取的文件中的数据样式:
每一行都会被封装到一个Event对象中,然后构成DataStream。
//创建一个Pojo类,4个字段
case class Event(
key: String,
time_ : Long,
event_type: Int,
v: String,
partition: String
)
Event对象有五个字段,会使用逗号分割
输出结果:
原始数据:Event(a,1,1,banana,0)
原始数据:Event(b,21,2,tomato,0)
原始数据:Event(c,12,1,apple,0)
原始数据:Event(d,10,2,orange,0)
原始数据:Event(e,101,1,watermeleon,0)
原始数据:Event(a,3,1,ba,1)
原始数据:Event(b,11,2,to,0)
原始数据:Event(c,42,1,ap,0)
原始数据:Event(d,20,2,or,0)
原始数据:Event(e,111,2,wa,0)
ds2:Event(a,1,1,banana,1)
ds2:Event(b,21,2,tomato,0)
ds2:Event(c,12,1,apple,0)
ds2:Event(d,10,2,orange,0)
ds2:Event(e,101,1,watermeleon,0)
原始数据:Event(a,2,1,ba,0)
原始数据:Event(b,2,2,to,0)
原始数据:Event(c,88,1,ap,0)
原始数据:Event(d,44,2,or,0)
原始数据:Event(e,11,2,wa,0)
原始数据:Event(a,33,1,banana,0)
原始数据:Event(b,21,2,tomato,1)
原始数据:Event(c,55,2,apple,0)
原始数据:Event(d,66,1,orange,0)
原始数据:Event(e,101,1,watermeleon,0)
ds2:Event(a,2,1,ba,0)
ds2:Event(b,2,2,to,1)
Event(a,2,1,ba,0)
Event(b,21,2,tomato,0)
ds2:Event(c,88,1,ap,0)
Event(c,88,1,ap,0)
ds2:Event(d,44,2,or,0)
Event(d,44,2,or,0)
ds2:Event(e,11,2,wa,0)
Event(e,101,3,wa_watermeleon,0)
我们拿其中一个输出结果的数据简单分析一下:(上面最后一行)
Event(e,101,3,wa_watermeleon,0)
那么这个数据是如何输出的呢?
我们会发现ds1经过keyby以及counwindow后的max处理以后,留下了两条数据:
ds2:Event(e,101,1,watermeleon,0)
ds2:Event(e,11,2,wa,0)
紧接着,把数据流ds2转为keyedStream
,然后又做了一次CountWindow
操作,窗口大小是2,具体实现的代码我们下面分析,这里先把结果分析完:
因为上面对于key = e
下,满足了两条数据,也就是满足了countwindow的触发计算,这个时候会对这两个数据处理,根据我们第三步功能描述可知处理如下:
event_type = 1
event_type = 2
这两条数据的该字段不等,根据(3.1)可知,会创建一个新的Event对象,该对象的 event_type = 3, v = wa_watermeleon(两条数据的该字段的字符串拼接构成)
最终得到如下结果:
Event(e,101,3,wa_watermeleon,0)
最后,我们对ds2使用了本文的主要介绍的特性reinterpretAsKeyedStream进行分析,这个方法在DataStreamUtils中。
使用reinterpretAsKeyedStream的代码:
val aggregated = new DataStreamUtils(ds2)
.reinterpretAsKeyedStream((event) => event.key)
.countWindow(2)
.reduce((e1, e2) =>
if(e1.event_type != e2.event_type)
Event(e1.key,e1.time_,3,e2.v+"_"+e1.v,e1.partition)
else if(e2.time_ > e1.time_) e2
else e1
)
.addSink(new SinkFunction[Event] {
override def invoke(value: Event, context: SinkFunction.Context[_]): Unit =
System.out.println(value.toString)
}).name("result")
不用reinterpretAsKeyedStream的代码:
val aggregated = ds2
.keyBy(_.key)
.countWindow(2)
.reduce((e1, e2) =>
if(e1.event_type != e2.event_type)
Event(e1.key,e1.time_,3,e2.v+"_"+e1.v,e1.partition)
else if(e2.time_ > e1.time_) e2
else e1
)
.addSink(new SinkFunction[Event] {
override def invoke(value: Event, context: SinkFunction.Context[_]): Unit =
System.out.println(value.toString)
}).name("result")
在这里就涉及到使用reinterpretAsKeyedStream的优势了,可能代码你无法更好的体会,我们通过StreamGraph来了解这两者的区别:
图片可能有点小,我们把关键地方放大查看:
在这里我们可以发现,同样是Window Operator,但是第一个Window Operator 的数据是通过上游HASH过来的,第二个是通过FORWARD方式过来
。
两个Operator之间的边展示的关键词,其实展示了两个算子之间数据是如何传输的,在之前的文章提到过关于partition的概念以及Flink已经提供的实现,此处阅读 。
public static <T, K> KeyedStream<T, K> reinterpretAsKeyedStream(
DataStream<T> stream,
KeySelector<T, K> keySelector,
TypeInformation<K> typeInfo) {
PartitionTransformation<T> partitionTransformation = new PartitionTransformation<>(
stream.getTransformation(),
new ForwardPartitioner<>());
return new KeyedStream<>(
stream,
partitionTransformation,
keySelector,
typeInfo);
}
上面代码是 方法reinterpretAsKeyedStream的具体实现,最后我们可以看到return了一个KeyedStream流
,创建这个流的时候首先创建了PartitionTransformation
对象,其中使用了ForwardPartitioner
分区器,那么FORWARD
其实也是来源于此。
我们看一下keyby操作如何生产KeyedStream的:
public <K> KeyedStream<T, K> keyBy(KeySelector<T, K> key, TypeInformation<K> keyType) {
Preconditions.checkNotNull(key);
Preconditions.checkNotNull(keyType);
return new KeyedStream<>(this, clean(key), keyType);
}
public KeyedStream(DataStream<T> dataStream, KeySelector<T, KEY> keySelector, TypeInformation<KEY> keyType) {
this(
dataStream,
new PartitionTransformation<>(
dataStream.getTransformation(),
new KeyGroupStreamPartitioner<>(keySelector, StreamGraphGenerator.DEFAULT_LOWER_BOUND_MAX_PARALLELISM)),
keySelector,
keyType);
}
上面代码我们可以看出,keyby同样构建keyedStream流,但是使用的分区器是KeyGroupStreamPartitioner。
转载:Flink实验特性–reinterpretAsKeyedStream
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/qq_21383435/article/details/123782690
内容来源于网络,如有侵权,请联系作者删除!