环境:scala、spark、结构化流媒体、kafka
我有一个来自Kafka流的df,模式如下
测向:
BATCH ID: 0
+-----------------------+-----+---------+------+
| value|topic|partition|offset|
+-----------------------+-----+---------+------+
|{"big and nested json"}| A | 0| 0|
|{"big and nested json"}| B | 0| 0|
+-----------------------+-----+---------+------+
我想使用spark并行处理每一行,并使用spark将它们分割给执行者
DF.repartition(Number).foreach(row=> processRow(row))
我需要将value列中的值提取到它自己的Dataframe中来处理它。我在处理dataframe通用行对象时遇到困难。。
有没有办法将每个执行器中的单行转换为它自己的Dataframe(使用固定模式?)并在固定位置写入?有没有更好的方法来解决我的问题?
编辑+澄清:
df-im接收是使用 forEachBatch
自开始存在的writestream功能的函数 spark2.4
目前,将df拆分为行可以使行平均拆分为我的所有执行器,我想将单个genericrow对象转换为dataframe,以便使用我创建的函数进行处理
例如,我会将行发送到函数
processRow(row:row)
将值和主题转换回一行df
+-----------------------+-----+
| value|topic|
+-----------------------+-----+
|{"big and nested json"}| A |
+-----------------------+-----+
用于进一步处理
2条答案
按热度按时间ffvjumwh1#
在这种情况下,它更适合使用
.map
而不是.foreach
. 原因是map
返回一个新的数据集,同时foreach
只是一个函数,不返回任何东西。另一件可以帮助您的事情是解析json中的模式。
我最近也有类似的要求。我的json对象对这两个主题都有一个“相似”的模式
A
以及B
. 如果不是这样,您可能需要创建多个dataframes
在下面的解决方案中,按主题对它们进行分组。这会给你一些类似于:
然后你可以把它传给一个新的
read
功能:此时,您将访问嵌套json中的值:
当涉及到
sanitiseJson
如果需要的话。ux6nzvsh2#
我猜你一次要消耗多个Kafka数据。
首先你需要准备
schema
对于所有的Kafka主题,这里举个例子,我在value列中使用了两个不同的json。主题a的架构
主题b的架构
结合主题及其模式。
正在处理Dataframe
写入hdfs
最终数据存储在hdfs中