scala—在sparkDataframe行上并行操作

ie3xauqp  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(437)

环境:scala、spark、结构化流媒体、kafka
我有一个来自Kafka流的df,模式如下
测向:

BATCH ID: 0
+-----------------------+-----+---------+------+
|                  value|topic|partition|offset|
+-----------------------+-----+---------+------+
|{"big and nested json"}|  A  |        0|     0|
|{"big and nested json"}|  B  |        0|     0|
+-----------------------+-----+---------+------+

我想使用spark并行处理每一行,并使用spark将它们分割给执行者

DF.repartition(Number).foreach(row=> processRow(row))

我需要将value列中的值提取到它自己的Dataframe中来处理它。我在处理dataframe通用行对象时遇到困难。。
有没有办法将每个执行器中的单行转换为它自己的Dataframe(使用固定模式?)并在固定位置写入?有没有更好的方法来解决我的问题?
编辑+澄清:
df-im接收是使用 forEachBatch 自开始存在的writestream功能的函数 spark2.4 目前,将df拆分为行可以使行平均拆分为我的所有执行器,我想将单个genericrow对象转换为dataframe,以便使用我创建的函数进行处理
例如,我会将行发送到函数

processRow(row:row)

将值和主题转换回一行df

+-----------------------+-----+
|                  value|topic|
+-----------------------+-----+
|{"big and nested json"}|  A  |
+-----------------------+-----+

用于进一步处理

ffvjumwh

ffvjumwh1#

在这种情况下,它更适合使用 .map 而不是 .foreach . 原因是 map 返回一个新的数据集,同时 foreach 只是一个函数,不返回任何东西。
另一件可以帮助您的事情是解析json中的模式。
我最近也有类似的要求。我的json对象对这两个主题都有一个“相似”的模式 A 以及 B . 如果不是这样,您可能需要创建多个 dataframes 在下面的解决方案中,按主题对它们进行分组。

val sanitiseJson: String => String = value => value
  .replace("\\\"", "\"")
  .replace("\\\\", "\\")
  .replace("\n", "")
  .replace("\"{", "{")
  .replace("}\"", "}")

val parsed = df.toJSON
  .map(sanitiseJson)

这会给你一些类似于:

{
    "value": { ... },
    "topic": "A"
}

然后你可以把它传给一个新的 read 功能:

var dfWithSchema = spark.read.json(parsed)

此时,您将访问嵌套json中的值:

dfWithSchema.select($"value.propertyInJson")

当涉及到 sanitiseJson 如果需要的话。

ux6nzvsh

ux6nzvsh2#

我猜你一次要消耗多个Kafka数据。
首先你需要准备 schema 对于所有的Kafka主题,这里举个例子,我在value列中使用了两个不同的json。

scala> val df = Seq(("""{"name":"Srinivas"}""","A"),("""{"age":20}""","B")).toDF("value","topic")
scala> df.show(false)
+-------------------+-----+
|value              |topic|
+-------------------+-----+
|{"name":"Srinivas"}|A    |
|{"age":20}         |B    |
+-------------------+-----+
scala> import org.apache.spark.sql.types._

主题a的架构

scala> val topicASchema = DataType.fromJson("""{"type":"struct","fields":[{"name":"name","type":"string","nullable":true,"metadata":{}}]}""").asInstanceOf[StructType]

主题b的架构

scala> val topicBSchema = DataType.fromJson("""{"type":"struct","fields":[{"name":"age","type":"long","nullable":true,"metadata":{}}]}""").asInstanceOf[StructType]

结合主题及其模式。

scala> val topicSchema = Seq(("A",topicASchema),("B",topicBSchema)) // Adding Topic & Its Schema.

正在处理Dataframe

scala> topicSchema
.par
.map(d => df.filter($"topic" === d._1).withColumn("value",from_json($"value",d._2)))
.foreach(_.show(false)) // Using .par & filtering dataframe based on topic & then applying schema to value column.
+----------+-----+
|value     |topic|
+----------+-----+
|[Srinivas]|A    |
+----------+-----+

+-----+-----+
|value|topic|
+-----+-----+
|[20] |B    |
+-----+-----+

写入hdfs

scala> topicSchema
.par
.map(d => df.filter($"topic" === d._1).withColumn("value",from_json($"value",d._2)).write.format("json").save(s"/tmp/kafka_data/${d._1}"))

最终数据存储在hdfs中

scala> import sys.process._
import sys.process._

scala> "tree /tmp/kafka_data".!
/tmp/kafka_data
├── A
│   ├── part-00000-1e854106-49de-44b3-ab18-6c98a126c8ca-c000.json
│   └── _SUCCESS
└── B
    ├── part-00000-1bd51ad7-cfb6-4187-a374-4e2d4ce9cc50-c000.json
    └── _SUCCESS

2 directories, 4 files

相关问题