我正在寻找一个解决方案,写的Spark流数据到Kafka。我使用以下方法将数据写入Kafka
df.selectExpr("to_json(struct(*)) AS value").writeStream.format("kafka")
但我的问题是,在给Kafka写信时,数据显示如下
{"country":"US","plan":postpaid,"value":300}
{"country":"CAN","plan":0.0,"value":30}
我的预期产出是
[
{"country":"US","plan":postpaid,"value":300}
{"country":"CAN","plan":0.0,"value":30}
]
我想把数组中的行括起来。如何在spark流媒体中实现同样的效果?有人能给点建议吗
2条答案
按热度按时间thigvfpy1#
我真的不确定这是否可行,但我还是会把我的建议贴在这里;因此,您可以在以后转换Dataframe:
vlju58qv2#
我假设流Dataframe的模式(
df
)具体如下:我还假设您希望写入(生成)流Dataframe中的所有行(
df
)以单个记录的形式输出到kafka主题,其中的行是json数组的形式。如果是这样,你应该
groupBy
排成一排collect_list
将所有行组合成一行,您可以将其写出给Kafka。我会在datastreamwriter.foreachbatch中执行以上所有操作,以获得要处理的Dataframe。