sparkstreaming:directstream rdd到Dataframe

beq87vna  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(329)

这个问题在这里已经有答案了

集成spark结构化流与融合模式注册表(7个答案)
两年前关门了。
我正在研究spark流上下文,它从avro序列化中的kafka主题获取数据,如下所示。

val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> "localhost:9092",
  "schema.registry.url" -> "http://localhost:8081",
  "key.deserializer" -> "io.confluent.kafka.serializers.KafkaAvroDeserializer",
  "value.deserializer" -> "io.confluent.kafka.serializers.KafkaAvroDeserializer",
  "group.id" -> "1"
)

使用kafka utils,我创建了如下的直接流

val topics = Set("mysql-foobar")

val stream = KafkaUtils.createDirectStream[String, String](
  ssc,
  PreferConsistent,
  Subscribe[String,String](
    topics,
    kafkaParams)
)

我还将数据作为

stream.foreachRDD ( rdd => {
  rdd.foreachPartition(iterator => {
    while (iterator.hasNext) {
      val next = iterator.next()
      println(next.value())
    }
  })
})

现在我想从这些rdd创建Dataframe。有没有可能我已经审查和测试了许多解决方案,从stackoverflow,但得到一些问题与他们。stackoverflow的解决方案是this和this。我的输出如下

{"c1": 4, "c2": "Jarry", "create_ts": 1536758512000, "update_ts": 1537204805000}
mwg9r5ms

mwg9r5ms1#

由于您使用的是合流序列化程序,而且它们目前不提供与spark的轻松集成,因此您可以通过absaoss在github上 checkout 一个相对较新的库来帮助实现这一点。
但基本上,您使用spark结构化流来获取Dataframe,不要尝试使用数据流到rdd到Dataframe。。。
你可以在这里找到你要找的东西的例子
另请参阅将spark结构化流与kafka模式注册表集成的其他示例

相关问题