您能告诉我为什么流表的分组结果没有打印到控制台的结果中吗?
未分组结果打印到控制台,没有任何问题。
Flink 版本= 1.15.0(SCALA = 2.12)(JAVA = 8)
先谢谢你。
import org.apache.flink.table.api.{DataTypes, EnvironmentSettings, Schema, Table, TableDescriptor, TableEnvironment}
object readKafkaStream02 extends App {
val settings = EnvironmentSettings
.newInstance()
.inStreamingMode()
// .inBatchMode()
.build()
val tEnv = TableEnvironment.create(settings);
val schema = Schema.newBuilder()
schema.column("id", DataTypes.INT())
schema.column("type", DataTypes.STRING())
schema.column("amount", DataTypes.FLOAT())
schema.column("trx_timestamp", DataTypes.TIMESTAMP(3))
schema.watermark("trx_timestamp", "trx_timestamp - INTERVAL '10' SECOND")
tEnv.createTemporaryTable("kafka_stream_input", TableDescriptor.forConnector("kafka")
.schema(schema.build())
.format("csv")
.option("topic","test_producer01")
.option("properties.bootstrap.servers","localhost:9092")
.option("properties.group.id","flink-test")
.option("scan.startup.mode" , "latest-offset")
.build()
)
/* tEnv.executeSql(
"""select * FROM TABLE
|(
|TUMBLE( DATA => TABLE kafka_stream_input,TIMECOL => DESCRIPTOR(trx_timestamp),SIZE => INTERVAL '2' MINUTES)
|)
|""".stripMargin ).print()*/ ========> works fine
val temp_table : Table = tEnv.sqlQuery(
"""select sum(amount) , window_start , window_end from TABLE
|(
|TUMBLE( DATA => TABLE kafka_stream_input,TIMECOL => DESCRIPTOR(trx_timestamp),SIZE => INTERVAL '2' MINUTES)
|) GROUP BY window_start, window_end ;
|""".stripMargin )
tEnv.registerTable("temp_table",temp_table)
temp_table.execute().print() `enter code here`======> just hung and not printing any o/p to console
/* tEnv.executeSql(
"""select TUMBLE_START(trx_timestamp, INTERVAL '10' MINUTE ) , sum(amount) from kafka_stream_input
|group by TUMBLE(trx_timestamp, INTERVAL '10' MINUTE )
|""".stripMargin).print()*/
}
1条答案
按热度按时间noj0wjuj1#
遵循这一原则:Flink SQL Query not returning results
按预期工作。
瓦尔t环境配置= t环境获取配置();
tEnv_config.set(“表格执行源空闲超时”,“1000毫秒”);