在flink中使用表(数据流)时出错

j8ag8udp  于 2021-06-24  发布在  Flink
关注(0)|答案(1)|浏览(382)

我使用的是flink 1.9.0,无法导入或获取表文件
我试过导入与之相关的不同SBT

def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val tEnv = StreamTableEnvironment.create(env)

    val tempSource = CsvTableSource.builder()
      .path("/home/amulya/Desktop/csvForBroadcast/CSV.csv")
      .fieldDelimiter(",")
      .field("locationID", Types.STRING())
      .field("temp", Types.DOUBLE())
      .build()

    tEnv.registerTableSource("Temperatures", tempSource)

    val askTable = tEnv
      .scan("Temperatures")
      .where(" 'Temperature >= 50")
      .select("'locationID, 'temp")

    val stream = tEnv.toAppendStream[Events](askTable)
      .print()
    env.execute()

  }
  case class Events(locationID: String, temp: Long)
}

我有一个简单的csv数据format:-

locationID,temp
"1",25
"2",25
"3",35
"4",45
"5",55

这是错误:-

Error:scalac: missing or invalid dependency detected while loading class file 'ScalaCaseClassSerializer.class'.
Could not access type SelfResolvingTypeSerializer in object org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot,
because it (or its dependencies) are missing. Check your build definition for
missing or conflicting dependencies. (Re-run with `-Ylog-classpath` to see the problematic classpath.)
A full rebuild may help if 'ScalaCaseClassSerializer.class' was compiled against an incompatible version of org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot.

我正在尝试对这些基本数据执行cep,以便开始使用apache flink,任何帮助都将不胜感激

0kjbasz6

0kjbasz61#

试试flink表api java桥。
目前scala桥接模块只提供表和数据集/数据流之间的基本转换。

相关问题