使用spark和scala处理json rdd中的每个json记录

r3i60tvu  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(455)

我需要以下场景的帮助:
我将从kafka获取以下json格式的数据,以触发流式传输

{"id" : 1 , "data" : "AFGH00101219"}
{"id" : 2 , "data" : "AFGH00101215"}
{"id" : 2 , "data" : "AFGH00101216"}
{"id" : 3 , "data" : "AFGH00101218"}

val messages= KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)

现在我要处理消息中的每个json记录,每个记录依次返回一组记录。请给我一些想法来完成下面的任务。

val output = messages.map(row =>
{
//here I will get each json record. My doubt is how to extract id and data 
//filed values from row and store it into variables.
//Here I need to decode the data filed value which is in hexa decimal format 
//to decimal format.
}

提前谢谢。如果问题不清楚,请告诉我。

5us2dqdw

5us2dqdw1#

您可以使用googlegson或任何json解析库,我使用googlegson解析spark streaming中接收到的json数据,如下所示。

// loop each RDD 
lines.foreachRDD(rawRDD => {
    val rdd = rawRDD.filter(!_.isEmpty)
      .map(row => {
        val jobj = new Gson().fromJson(row, classOf[JsonObject])
        val id = jobj.getAsJsonObject("id").getAsString
        val data = jobj.getAsJsonObject("data").getAsString
        // Do something with id and data
      })
  })

另一种方法是从接收到的rdd创建Dataframe

lines.foreachRDD(rawRDD => {
  val rdd = rawRDD.filter(!_.isEmpty)
  val df = spark.read.json(rdd)
  df.show(false)
  })

这将从rdd创建一个Dataframe,如下所示,您不能将id和数据用于任何其他转换/操作。

+------------+---+
|data        |id |
+------------+---+
|AFGH00101219|1  |
|AFGH00101215|2  |
|AFGH00101216|2  |
|AFGH00101218|3  |
+------------+---+

我希望这有帮助!

相关问题