我正在处理数据库突变流,即更改日志流。我希望能够使用sql查询转换值。我很难把以下三个概念放在一起 RowTypeInfo
, Row
,和 DataStream
.
注意:我事先不知道模式。我使用 Mutation
对象( Mutation
是自定义类型)
更具体地说,我有这样的代码。
val execEnv = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv: StreamTableEnvironment = TableEnvironment.getTableEnvironment(execEnv)
// Mutation is a custom type
val mutationStream: DataStream[Mutation] = ...
// toRows returns an object of type org.apache.flink.types.Row
val rowStream:DataStream[Row] = mutationStream.flatMap({mutation => toRows(mutation)})
tableEnv.registerDataStream("spinal_tap_table", rowStream)
tableEnv.sql("select col1 + 2")
注: Row
对象是位置性的,没有列名占位符。我找不到将架构附加到 DataStream
对象。
我想传递一个类似于 Row
包含完整信息的 {columnName: String, columnValue: Object, columnType: TypeInformation[_]}
用于查询。
1条答案
按热度按时间polhcujo1#
在flinksql中,当
Table
定义。无法对动态类型的记录运行查询。关于
RowTypeInfo
,Row
以及DataStream
:Row
是保存数据的实际记录RowTypeInfo
是的架构描述Row
s。它包含名称和TypeInformation
对于每个字段Row
.DataStream
是一个逻辑记录流。一DataStream[Row]
是一排排。请注意,这不是实际的流,而只是表示api中的流的api概念。