flink+kafka 0.10:如何创建一个以kafka消息时间戳为字段的表?

cgfeq70w  于 2021-06-25  发布在  Flink
关注(0)|答案(1)|浏览(407)

我想提取 FlinkKafkaConsumer010 作为数据流中的值。
我知道 AssignerWithPeriodicWatermarks 类,但这似乎只是为了通过 DataStream 应用程序编程接口。
我想让Kafka的消息时间戳在 Table 所以以后,我可以在上面使用sql。
编辑:尝试了以下操作:

val consumer = new FlinkKafkaConsumer010("test", new SimpleStringSchema, properties)
  consumer.setStartFromEarliest()

  val env = StreamExecutionEnvironment.getExecutionEnvironment
  val tenv = TableEnvironment.getTableEnvironment(env)

  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

  class KafkaAssigner[T] extends AssignerWithPeriodicWatermarks[T] {
    var maxTs = 0L
    override def extractTimestamp(element: T, previousElementTimestamp: Long): Long = {
      maxTs = Math.max(maxTs, previousElementTimestamp)
      previousElementTimestamp
    }
    override def getCurrentWatermark: Watermark = new Watermark(maxTs - 1L)
  }

  val stream = env
    .addSource(consumer)
    .assignTimestampsAndWatermarks(new KafkaAssigner[String])
    .flatMap(_.split("\\W+"))

  val tbl = tenv.fromDataStream(stream, 'w, 'ts.rowtime)

它编译,但抛出:

Exception in thread "main" org.apache.flink.table.api.TableException: Field reference expression requested.
    at org.apache.flink.table.api.TableEnvironment$$anonfun$1.apply(TableEnvironment.scala:630)
    at org.apache.flink.table.api.TableEnvironment$$anonfun$1.apply(TableEnvironment.scala:624)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
    at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
    at scala.collection.mutable.ArrayOps$ofRef.flatMap(ArrayOps.scala:186)
    at org.apache.flink.table.api.TableEnvironment.getFieldInfo(TableEnvironment.scala:624)
    at org.apache.flink.table.api.StreamTableEnvironment.registerDataStreamInternal(StreamTableEnvironment.scala:398)
    at org.apache.flink.table.api.scala.StreamTableEnvironment.fromDataStream(StreamTableEnvironment.scala:85)

在上面代码的最后一行。
edit2:感谢@fabian hueske为我指出了一个解决方法。完整代码位于https://github.com/andrey-savov/flink-kafka

w46czmvw

w46czmvw1#

如果配置了时间特性eventtime,flink的kafka 0.10消费者会自动将kafka消息的时间戳设置为生成记录的事件时间时间戳(参见docs)。
在你把Kafka的主题吸收到 DataStream 通过指定时间戳(仍然不可见)和水印,可以将其转换为 TableStreamTableEnvironment.fromDataStream(stream, fieldExpr*) 方法。这个 fieldExpr* 参数是描述所生成表的架构的表达式列表。您可以添加一个字段,用表达式保存流的记录时间戳 mytime.rowtime ,在哪里 mytime 是新字段的名称 rowtime 指示从记录时间戳中提取值。请检查文件以了解详情。
注:正如@bfair所指出的 DataStream 原子类型的(例如 DataStream[String] )在flink 1.3.2和更早版本中出现异常时失败。据报道,这个错误名为flink-7939,将在下一个版本中修复。

相关问题