我试着用 CoGroupFunction
在Flink。
我有两条小溪;它们是:;
s1级
val m = env
.addSource(new FlinkKafkaConsumer010[String]("topic-1", schema, props))
.map(gson.fromJson(_, classOf[Master]))
.assignAscendingTimestamps(_.time)
s2级
val d = env
.addSource(new FlinkKafkaConsumer010[String]("topic-2", schema, props))
.map(gson.fromJson(_, classOf[Detail]))
.assignAscendingTimestamps(_.time)
还有我的 coGroup
实施是;
class MasterDetailOuterJoin extends CoGroupFunction[Master, Detail,
(Master, Option[Detail])] {
override def coGroup(
leftElements : java.lang.Iterable[Master],
rightElements: java.lang.Iterable[Detail],
out: Collector[(Master, Option[Detail]) ]): Unit = {
for (leftElem <- leftElements) {
var isMatch = false
println(leftElem.orderNo)
for (rightElem <- rightElements) {
println(rightElem.orderNo)
out.collect((leftElem, Some(rightElem)))
isMatch = true
}
if (!isMatch) {
out.collect((leftElem, None))
}
}
}
}
我和它一起跑;
m.coGroup(d)
.where(_.orderNo)
.equalTo(_.orderNo)
.window(TumblingEventTimeWindows.of(Time.of(5, TimeUnit.SECONDS)))
.apply(new MasterDetailOuterJoin)
.map(gson.toJson(_, classOf[(Master, Option[Detail])]))
.print
但没有什么是印刷,即使有一个匹配的主人和细节!我用console consumer监视kafka流,顺便说一句,它们运行良好。
如果我用一个内部连接来代替,我会得到结果。
m.keyBy(_.orderNo)
.connect(d.keyBy(_.orderNo))
.flatMap(new MasterDetailInnerJoin) //RichCoFlatMapFunction
.map(gson.toJson(_, classOf[(Master, Detail)]))
.print
1条答案
按热度按时间flseospp1#
原来,我缺少的是;
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
并为每个流分配时间戳和水印提取器