flink cassandrasink.addsink(输入)类型不匹配

7vux5j2d  于 2021-06-25  发布在  Flink
关注(0)|答案(1)|浏览(353)

我是Flink的新手。我正在尝试使用flink1.3.2来读取我们的kinesis流,并将输出写入cassandra表。这个程序能够从动觉中输入数据。
问题是,当我执行“cassandrasink.addsink(countsstreaming)”时,它会导致“类型不匹配”。应为:datastream[notinferredin],实际为:datastream[(string,long)]'。我浏览了文档和源代码,注意到addsink接受datastream[in]。
有人能帮我理解“in”类型是什么以及如何解决这个问题吗?
提前谢谢!

val env = StreamExecutionEnvironment.getExecutionEnvironment
val mapper = new ObjectMapper
val kinesis = env.addSource(new FlinkKinesisConsumer[String](
  "kinesis-stream", new SimpleStringSchema, ConsumerConfig))

//DataStream[(String, Long)]
val countsStreaming: DataStream[(String, Long)] = kinesis.map(x => mapper.readValue(x,classOf[java.util.Map[String,String]]))
  .map(x => x.get("game_name"))
  .map({x => (x,1L) })
  .keyBy(0)
  .timeWindow(Time.seconds(5))
  .sum(1)

countsStreaming.print()

CassandraSink.addSink(countsStreaming)
  .setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
.setClusterBuilder(new ClusterBuilder() {
  override def buildCluster(builder: Cluster.Builder): Cluster = {
    builder.addContactPoint("0.0.0.0").build()
  }
}).build()

env.execute("StreamingExample")
zlwx9yxi

zlwx9yxi1#

问题是 CassandraSink.addSink 只接受java数据流。
你需要加上 .javaStream 在scala数据流之后,类型不匹配应该消失了。

相关问题