如何将所有行作为json数组写入kafka流Dataframe?

nzkunb0c  于 2021-06-06  发布在  Kafka
关注(0)|答案(2)|浏览(462)

我正在寻找一个解决方案,写的Spark流数据到Kafka。我使用以下方法将数据写入Kafka

df.selectExpr("to_json(struct(*)) AS value").writeStream.format("kafka")

但我的问题是,在给Kafka写信时,数据显示如下

{"country":"US","plan":postpaid,"value":300}
{"country":"CAN","plan":0.0,"value":30}

我的预期产出是

[
    {"country":"US","plan":postpaid,"value":300}
    {"country":"CAN","plan":0.0,"value":30}
   ]

我想把数组中的行括起来。如何在spark流媒体中实现同样的效果?有人能给点建议吗

thigvfpy

thigvfpy1#

我真的不确定这是否可行,但我还是会把我的建议贴在这里;因此,您可以在以后转换Dataframe:

//Input  
 inputDF.show(false)
 +---+-------+
 |int|string |
 +---+-------+
 |1  |string1|
 |2  |string2|
 +---+-------+

 //convert that to json
 inputDF.toJSON.show(false)
 +----------------------------+
 |value                       |
 +----------------------------+
 |{"int":1,"string":"string1"}|
 |{"int":2,"string":"string2"}|
 +----------------------------+

 //then use collect and mkString
 println(inputDF.toJSON.collect().mkString("[", "," , "]"))
 [{"int":1,"string":"string1"},{"int":2,"string":"string2"}]
vlju58qv

vlju58qv2#

我假设流Dataframe的模式( df )具体如下:

root
 |-- country: string (nullable = true)
 |-- plan: string (nullable = true)
 |-- value: string (nullable = true)

我还假设您希望写入(生成)流Dataframe中的所有行( df )以单个记录的形式输出到kafka主题,其中的行是json数组的形式。
如果是这样,你应该 groupBy 排成一排 collect_list 将所有行组合成一行,您可以将其写出给Kafka。

// df is a batch DataFrame so I could show for demo purposes
scala> df.show
+-------+--------+-----+
|country|    plan|value|
+-------+--------+-----+
|     US|postpaid|  300|
|    CAN|     0.0|   30|
+-------+--------+-----+

val jsons = df.selectExpr("to_json(struct(*)) AS value")
scala> jsons.show(truncate = false)
+------------------------------------------------+
|value                                           |
+------------------------------------------------+
|{"country":"US","plan":"postpaid","value":"300"}|
|{"country":"CAN","plan":"0.0","value":"30"}     |
+------------------------------------------------+

val grouped = jsons.groupBy().agg(collect_list("value") as "value")
scala> grouped.show(truncate = false)
+-----------------------------------------------------------------------------------------------+
|value                                                                                          |
+-----------------------------------------------------------------------------------------------+
|[{"country":"US","plan":"postpaid","value":"300"}, {"country":"CAN","plan":"0.0","value":"30"}]|
+-----------------------------------------------------------------------------------------------+

我会在datastreamwriter.foreachbatch中执行以上所有操作,以获得要处理的Dataframe。

相关问题