apache-flink使用cogroup实现左外连接

6ie5vjzr  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(510)

我试着用 CoGroupFunction 在Flink。
我有两条小溪;它们是:;
s1级

val m = env
.addSource(new FlinkKafkaConsumer010[String]("topic-1", schema, props))
.map(gson.fromJson(_, classOf[Master]))
.assignAscendingTimestamps(_.time)

s2级

val d = env
.addSource(new FlinkKafkaConsumer010[String]("topic-2", schema, props))
.map(gson.fromJson(_, classOf[Detail]))
.assignAscendingTimestamps(_.time)

还有我的 coGroup 实施是;

class MasterDetailOuterJoin extends CoGroupFunction[Master, Detail, 
(Master, Option[Detail])] {

  override def coGroup(
      leftElements : java.lang.Iterable[Master],
      rightElements: java.lang.Iterable[Detail],
      out: Collector[(Master, Option[Detail]) ]): Unit = {

    for (leftElem <- leftElements) {
      var isMatch = false
      println(leftElem.orderNo)
      for (rightElem <- rightElements) {
        println(rightElem.orderNo)
        out.collect((leftElem, Some(rightElem)))
        isMatch = true
      }
      if (!isMatch) {
        out.collect((leftElem, None))
      }
    }
  }
}

我和它一起跑;

m.coGroup(d)
    .where(_.orderNo)
    .equalTo(_.orderNo)
    .window(TumblingEventTimeWindows.of(Time.of(5, TimeUnit.SECONDS)))
    .apply(new MasterDetailOuterJoin)
    .map(gson.toJson(_, classOf[(Master, Option[Detail])]))
    .print

但没有什么是印刷,即使有一个匹配的主人和细节!我用console consumer监视kafka流,顺便说一句,它们运行良好。
如果我用一个内部连接来代替,我会得到结果。

m.keyBy(_.orderNo)
    .connect(d.keyBy(_.orderNo))
    .flatMap(new MasterDetailInnerJoin) //RichCoFlatMapFunction
    .map(gson.toJson(_, classOf[(Master, Detail)]))
    .print
flseospp

flseospp1#

原来,我缺少的是; env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) 并为每个流分配时间戳和水印提取器

相关问题