kafka主题激发流式数据流,如何获得json

wxclj1h5  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(376)

我试图通过spark streaming从一个kafka主题中获取信息,然后解析主题中得到的json。为了在数据流中获取主题,我使用stringreader,然后使用foreach从数据流中获取每个rdd:

myRDD.collect().foreach(println)

为了将myrdd转换成json(当我打印myrdd时,json的格式是正确的)并提取我需要的两个字段,我尝试使用json4s并尝试了以下方法:

val jsonEvent = Json.parse(myRDD)
val srcIp = (jsonEvent / "src_ip")
val dstIp =  (jsonEvent / "dst_ip")

我也尝试过这样使用json4s:

val jsonEvent = parse(myRDD).asInstanceOf[JObject]
val srcIp = jsonEvent / "src_ip"

但它也不能正常工作。
这是输出:

java.lang.NoSuchMethodError: rg.json4s.jackson.JsonMethods$.parse$default$3()Z

以下是我正在使用的版本:

<dependency>
    <groupId>org.json4s</groupId>
    <artifactId>json4s-native_2.10</artifactId>
    <version>3.5.1</version>
</dependency>
<dependency>
    <groupId>org.json4s</groupId>
    <artifactId>json4s-jackson_2.10</artifactId>
    <version>3.5.1</version>
</dependency>

我认为问题是我不明白如何将rdd中的每条记录转换成json对象来解析它。有没有人能更深入地向我解释一下,让我明白它是怎么工作的?我的代码正确吗?
谢谢您。

ycggw6v2

ycggw6v21#

spark为您提供了将输入json读取到数据集/Dataframe的api。
如果您正在从文件中读取json。你可以使用
SparkSession read().json() 方法

SparkSession spark = SparkSession.builder().appName("Test App").master("local[*]")
                .getOrCreate();

Dataset<Row> inputfileDataset = spark.read().option("multiLine", true).option("mode", "PERMISSIVE")
            .json("C:\path-to-file\inputfile.json");

如果你在读Kafka的作品。您可以通过将每个rdd转换为 ListString 然后将它们转换为数据集/Dataframe

JavaPairInputDStream<String, String> directKafkaStream = KafkaUtils.createDirectStream(ssc, String.class,
                String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topics);

JavaDStream<String> inputJsonStream = directKafkaStream.map(rdd -> {
    return rdd._2;
});

inputJsonStream.foreachRDD(inputRDD -> {
    SparkSession spark = JavaSparkSessionSingleton.getInstance(inputRDD.context().getConf());
    List<String> strings = inputRDD.collect();
    strings.forEach(x -> {
        Dataset<Row> inputDataset = spark.read().option("multiLine",true).option("mode", "PERMISSIVE").json(inputRDD);
        inputDataset.printSchema();
});

对于查询数据集/Dataframe,可以使用 Select() 函数,然后将其转换为所需的数据类型。

相关问题