Flink 流模式下的表API未将结果打印到控制台

ff29svar  于 2022-12-16  发布在  Apache
关注(0)|答案(1)|浏览(370)

您能告诉我为什么流表的分组结果没有打印到控制台的结果中吗?
未分组结果打印到控制台,没有任何问题。
Flink 版本= 1.15.0(SCALA = 2.12)(JAVA = 8)
先谢谢你。

import org.apache.flink.table.api.{DataTypes, EnvironmentSettings, Schema, Table, TableDescriptor, TableEnvironment}

object readKafkaStream02 extends App {

  val settings = EnvironmentSettings
    .newInstance()
    .inStreamingMode()
    // .inBatchMode()
    .build()

  val tEnv = TableEnvironment.create(settings);

  val schema = Schema.newBuilder()
  schema.column("id", DataTypes.INT())
  schema.column("type", DataTypes.STRING())
  schema.column("amount", DataTypes.FLOAT())
  schema.column("trx_timestamp", DataTypes.TIMESTAMP(3))
  schema.watermark("trx_timestamp", "trx_timestamp - INTERVAL '10' SECOND")

  tEnv.createTemporaryTable("kafka_stream_input",  TableDescriptor.forConnector("kafka")
    .schema(schema.build())
    .format("csv")
    .option("topic","test_producer01")
    .option("properties.bootstrap.servers","localhost:9092")
    .option("properties.group.id","flink-test")
    .option("scan.startup.mode" , "latest-offset")
    .build()
  )

 /* tEnv.executeSql(
    """select * FROM TABLE
      |(
      |TUMBLE( DATA => TABLE kafka_stream_input,TIMECOL => DESCRIPTOR(trx_timestamp),SIZE => INTERVAL '2' MINUTES)
      |)
      |""".stripMargin ).print()*/  ========> works fine 

  val temp_table : Table  = tEnv.sqlQuery(
  """select  sum(amount)  , window_start , window_end from TABLE
    |(
    |TUMBLE( DATA => TABLE kafka_stream_input,TIMECOL => DESCRIPTOR(trx_timestamp),SIZE => INTERVAL '2' MINUTES)
    |) GROUP BY window_start, window_end ;
    |""".stripMargin )

  tEnv.registerTable("temp_table",temp_table)

  temp_table.execute().print() `enter code here`======> just hung and not printing any o/p to console 

 /* tEnv.executeSql(
    """select TUMBLE_START(trx_timestamp, INTERVAL '10' MINUTE ) , sum(amount)  from kafka_stream_input
      |group by TUMBLE(trx_timestamp, INTERVAL '10' MINUTE )
      |""".stripMargin).print()*/

}
noj0wjuj

noj0wjuj1#

遵循这一原则:Flink SQL Query not returning results
按预期工作。
瓦尔t环境配置= t环境获取配置();
tEnv_config.set(“表格执行源空闲超时”,“1000毫秒”);

相关问题