table无法转换为数据集它不是批处理表环境的一部分

4xrmg8kj  于 2021-06-21  发布在  Flink
关注(0)|答案(0)|浏览(195)

我有以下代码,我想试试flink sql的批处理:

package org.example.sql

import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.table.api.{DataTypes, EnvironmentSettings, TableEnvironment}
import org.apache.flink.table.descriptors.{Csv, FileSystem, Schema}

object Sql021_PlannerOldBatchTest {
  def main(args: Array[String]): Unit = {
    val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()

    val env = TableEnvironment.create(settings)

    val fmt = new Csv().fieldDelimiter(',').deriveSchema()
    val schema = new Schema()
      .field("a", DataTypes.BIGINT())
      .field("b", DataTypes.STRING())
      .field("c", DataTypes.STRING())

    env.connect(new FileSystem().path("D:\\stock.csv")).withSchema(schema).withFormat(fmt).createTemporaryTable("sourceTable")

    env.sqlQuery("select * from sourceTable").collect().foreach(println)

    env.execute("")
  }

}

当我运行程序时,会引发以下异常,我不知道问题出在哪里:

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Exception in thread "main" org.apache.flink.table.api.ValidationException: Table cannot be converted into a DataSet. It is not part of a batch table environment.
    at org.apache.flink.table.api.bridge.scala.package$.table2RowDataSet(package.scala:70)
    at org.example.sql4.Sql021_PlannerOldBatchTest$.main(Sql021_PlannerOldBatchTest.scala:25)
    at org.example.sql4.Sql021_PlannerOldBatchTest.main(Sql021_PlannerOldBatchTest.scala)

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题