我有以下代码,我想试试flink sql的批处理:
package org.example.sql
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.table.api.{DataTypes, EnvironmentSettings, TableEnvironment}
import org.apache.flink.table.descriptors.{Csv, FileSystem, Schema}
object Sql021_PlannerOldBatchTest {
def main(args: Array[String]): Unit = {
val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()
val env = TableEnvironment.create(settings)
val fmt = new Csv().fieldDelimiter(',').deriveSchema()
val schema = new Schema()
.field("a", DataTypes.BIGINT())
.field("b", DataTypes.STRING())
.field("c", DataTypes.STRING())
env.connect(new FileSystem().path("D:\\stock.csv")).withSchema(schema).withFormat(fmt).createTemporaryTable("sourceTable")
env.sqlQuery("select * from sourceTable").collect().foreach(println)
env.execute("")
}
}
当我运行程序时,会引发以下异常,我不知道问题出在哪里:
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Exception in thread "main" org.apache.flink.table.api.ValidationException: Table cannot be converted into a DataSet. It is not part of a batch table environment.
at org.apache.flink.table.api.bridge.scala.package$.table2RowDataSet(package.scala:70)
at org.example.sql4.Sql021_PlannerOldBatchTest$.main(Sql021_PlannerOldBatchTest.scala:25)
at org.example.sql4.Sql021_PlannerOldBatchTest.main(Sql021_PlannerOldBatchTest.scala)
暂无答案!
目前还没有任何答案,快来回答吧!