如何动态地将模式附加到flink数据流?

g52tjvyc  于 2021-06-25  发布在  Flink
关注(0)|答案(1)|浏览(349)

我正在处理数据库突变流,即更改日志流。我希望能够使用sql查询转换值。我很难把以下三个概念放在一起 RowTypeInfo , Row ,和 DataStream .
注意:我事先不知道模式。我使用 Mutation 对象( Mutation 是自定义类型)
更具体地说,我有这样的代码。

val execEnv = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv: StreamTableEnvironment = TableEnvironment.getTableEnvironment(execEnv)

// Mutation is a custom type
val mutationStream: DataStream[Mutation] = ...
// toRows returns an object of type org.apache.flink.types.Row
val rowStream:DataStream[Row] = mutationStream.flatMap({mutation => toRows(mutation)})
tableEnv.registerDataStream("spinal_tap_table", rowStream)
tableEnv.sql("select col1 + 2")

注: Row 对象是位置性的,没有列名占位符。我找不到将架构附加到 DataStream 对象。
我想传递一个类似于 Row 包含完整信息的 {columnName: String, columnValue: Object, columnType: TypeInformation[_]} 用于查询。

polhcujo

polhcujo1#

在flinksql中,当 Table 定义。无法对动态类型的记录运行查询。
关于 RowTypeInfo , Row 以及 DataStream : Row 是保存数据的实际记录 RowTypeInfo 是的架构描述 Row s。它包含名称和 TypeInformation 对于每个字段 Row . DataStream 是一个逻辑记录流。一 DataStream[Row] 是一排排。请注意,这不是实际的流,而只是表示api中的流的api概念。

相关问题