我想提取 FlinkKafkaConsumer010
作为数据流中的值。
我知道 AssignerWithPeriodicWatermarks
类,但这似乎只是为了通过 DataStream
应用程序编程接口。
我想让Kafka的消息时间戳在 Table
所以以后,我可以在上面使用sql。
编辑:尝试了以下操作:
val consumer = new FlinkKafkaConsumer010("test", new SimpleStringSchema, properties)
consumer.setStartFromEarliest()
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tenv = TableEnvironment.getTableEnvironment(env)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
class KafkaAssigner[T] extends AssignerWithPeriodicWatermarks[T] {
var maxTs = 0L
override def extractTimestamp(element: T, previousElementTimestamp: Long): Long = {
maxTs = Math.max(maxTs, previousElementTimestamp)
previousElementTimestamp
}
override def getCurrentWatermark: Watermark = new Watermark(maxTs - 1L)
}
val stream = env
.addSource(consumer)
.assignTimestampsAndWatermarks(new KafkaAssigner[String])
.flatMap(_.split("\\W+"))
val tbl = tenv.fromDataStream(stream, 'w, 'ts.rowtime)
它编译,但抛出:
Exception in thread "main" org.apache.flink.table.api.TableException: Field reference expression requested.
at org.apache.flink.table.api.TableEnvironment$$anonfun$1.apply(TableEnvironment.scala:630)
at org.apache.flink.table.api.TableEnvironment$$anonfun$1.apply(TableEnvironment.scala:624)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at scala.collection.mutable.ArrayOps$ofRef.flatMap(ArrayOps.scala:186)
at org.apache.flink.table.api.TableEnvironment.getFieldInfo(TableEnvironment.scala:624)
at org.apache.flink.table.api.StreamTableEnvironment.registerDataStreamInternal(StreamTableEnvironment.scala:398)
at org.apache.flink.table.api.scala.StreamTableEnvironment.fromDataStream(StreamTableEnvironment.scala:85)
在上面代码的最后一行。
edit2:感谢@fabian hueske为我指出了一个解决方法。完整代码位于https://github.com/andrey-savov/flink-kafka
1条答案
按热度按时间w46czmvw1#
如果配置了时间特性eventtime,flink的kafka 0.10消费者会自动将kafka消息的时间戳设置为生成记录的事件时间时间戳(参见docs)。
在你把Kafka的主题吸收到
DataStream
通过指定时间戳(仍然不可见)和水印,可以将其转换为Table
与StreamTableEnvironment.fromDataStream(stream, fieldExpr*)
方法。这个fieldExpr*
参数是描述所生成表的架构的表达式列表。您可以添加一个字段,用表达式保存流的记录时间戳mytime.rowtime
,在哪里mytime
是新字段的名称rowtime
指示从记录时间戳中提取值。请检查文件以了解详情。注:正如@bfair所指出的
DataStream
原子类型的(例如DataStream[String]
)在flink 1.3.2和更早版本中出现异常时失败。据报道,这个错误名为flink-7939,将在下一个版本中修复。