自定义项中scala中sparkDataframe的listtype、maptype、structtype字段的一般处理?

qni6mghb  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(402)

如何在scala中对spark structtype执行常规处理,如按名称选择字段、遍历map/list字段等?
在spark dataframe中,我有类型为“arraytype”的列“instances”,其模式如下:

instances[ArrayType]:
    0 [ StructType:
            name [StringType]
            address[StringType]
            experiences[MapType]:
                Company-1[StringType]:
                    StructType:
                        numYears[IntType]: 5
                        grade[IntType]
                Company-2[StringType]:
                    StructType:
                        numYears[IntType]:  12
                        grade[IntType]]
     1 [ StructType:
            name [StringType]
            address[StringType]
            experiences[MapType]:
                Company-1[StringType]:
                    StructType:
                        numYears[IntType]: 3
                        grade[IntType]
                Company-2[StringType]:
                    StructType:
                        numYears[IntType]:  9
                        grade[IntType]]

我需要将此arraytype列“instances”转换为类型为的列“totalexperience”

derived column "totalExperience" of type "MapType"[StringType -> IntType]
company-1: 8
company-2: 21

注:(5+3=8和12+9=21)
等效psuedo代码:

totalExperience = Map<String, Int>();
for (instance in instances) {
    for ((currentExperience, numYears) in instance.getExperiences().entries()) {
         if (!totalExperience.contains(currentExperience)) {
              totalExperience.put(currentExperience, 0);
         }

         totalExperience.put(currentExperience, totalExperience.get(currentExperience) + numYears);
    }
}

return totalExperience

我为此编写了udf,如下所示,但我没有找到在scala spark中实现上述伪代码的任何方法:

private val computeTotalExperience = udf(_ => MapType = (instances: ArrayType) => {
    val totalExperienceByCompany = DataTypes.createMapType(StringType, LongType)

  **How to iterate over "instances" with type as "ArrayType" ?**
    for (instance <- instances) {
    **How to access and iterate over "experiences" mapType field on instance ???**
      // Populate totalExperienceByCompany(MapType) with key as "company-1" name

    }

    delayReasons
  })

如何对自定义项中scala中sparkDataframe的listtype、maptype、structtype字段进行上述一般处理?

rkue9o1l

rkue9o1l1#

检查以下代码。

scala> df.printSchema
root
 |-- instances: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- address: string (nullable = true)
 |    |    |-- experiences: map (nullable = true)
 |    |    |    |-- key: string
 |    |    |    |-- value: struct (valueContainsNull = true)
 |    |    |    |    |-- numYears: integer (nullable = true)
 |    |    |    |    |-- grade: string (nullable = true)
 |    |    |-- name: string (nullable = true)
scala> df.show(false)
+-----------------------------------------------------------------------------------------------------------------------------------+
|instances                                                                                                                          |
+-----------------------------------------------------------------------------------------------------------------------------------+
|[[address_0, [Company-1 -> [5, 1], Company-2 -> [12, 1]], name_0], [address_1, [Company-1 -> [3, 1], Company-2 -> [9, 1]], name_1]]|
+-----------------------------------------------------------------------------------------------------------------------------------+
scala> 
val expr = array(
    struct(lit("company-1").as("company"),$"instance.experiences.Company-1.numYears"),
    struct(lit("company-2").as("company"),$"instance.experiences.Company-2.numYears")
)
scala>  

df
.withColumn("instance",explode($"instances"))
.withColumn("company",explode(expr))
.select("company.*")
.groupBy($"company")
.agg(sum($"numYears").as("numYears"))
.select(map($"company",$"numYears").as("totalExperience"))
.show(false) 

+-----------------+                                                                                                                                                                                
|totalExperience  |                                                                                                                                                                                
+-----------------+                                                                                                                                                                                
|[company-1 -> 8] |                                                                                                                                                                                
|[company-2 -> 21]|                                                                                                                                                                                
+-----------------+

相关问题