我试图从[databricks][1]复制这个示例,并将其应用到kafka和spark structured streaming的新连接器中,但是我无法使用spark中现成的方法正确解析json。。。
注:本主题以json格式写入kafka。
val ds1 = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", IP + ":9092")
.option("zookeeper.connect", IP + ":2181")
.option("subscribe", TOPIC)
.option("startingOffsets", "earliest")
.option("max.poll.records", 10)
.option("failOnDataLoss", false)
.load()
下面的代码将无法工作,我相信这是因为列json是一个字符串,与来自\u json签名的方法不匹配。。。
val df = ds1.select($"value" cast "string" as "json")
.select(from_json("json") as "data")
.select("data.*")
有什么建议吗?
[更新]工作示例:https://github.com/katsou55/kafka-spark-structured-streaming-example/blob/master/src/main/scala-2.11/main.scala
1条答案
按热度按时间omvjsjqw1#
首先需要为json消息定义模式。例如
现在您可以在中使用此模式
from_json
方法如下。