使用pyspark dataframe根据索引从一个数组中定位值并复制到另一个数组

h43kikqp  于 2021-07-13  发布在  Spark
关注(0)|答案(1)|浏览(494)

在这个Dataframe中,我有以下两个数组:折扣应用程序和行项目。行\u items数组有一个名为discount\u allocaitons的内部数组,该数组有一个名为discount\u application\u index的字段。任务是使用折扣应用程序索引值,在折扣应用程序数组索引中找到相应的“类型”值,并将其复制到相应的应用程序类型字段中。
以下是Dataframe:

records = '[{"_c":{"discount_applications":[{"type":"manual0"},{"type":"manual1"},{"type":"manual2"},{"type":"manual3"}],"line_items":[{"discount_allocations":[{"application_type":"","discount_application_index":0}]},{"discount_allocations":[{"application_type":"","discount_application_index":1}]},{"discount_allocations":[{"application_type":"","discount_application_index":2}]},{"discount_allocations":[{"application_type":"","discount_application_index":3}]}]}},{"_c":{"discount_applications":[{"type":"manual0"},{"type":"manual1"},{"type":"manual2"}],"line_items":[{"discount_allocations":[{"application_type":"","discount_application_index":0}]},{"discount_allocations":[{"application_type":"","discount_application_index":1}]},{"discount_allocations":[{"application_type":"","discount_application_index":2}]}]}},{"_c":{"discount_applications":[{"type":"manual0"},{"type":"manual1"},{"type":"manual2"}],"line_items":[{"discount_allocations":[{"application_type":"","discount_application_index":0}]},{"discount_allocations":[{"application_type":"","discount_application_index":1}]},{"discount_allocations":[{"application_type":"","discount_application_index":2}]}]}}]'
df = spark.read.json(sc.parallelize([records]))
df.show(truncate=False)
df.printSchema()

root
 |-- _c: struct (nullable = true)
 |    |-- discount_applications: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- type: string (nullable = true)
 |    |-- line_items: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- discount_allocations: array (nullable = true)
 |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |-- application_type: string (nullable = true)
 |    |    |    |    |    |-- discount_application_index: long (nullable = true)

+--------------------------------------------------------------------------------------------+
|_c                                                                                          |
+--------------------------------------------------------------------------------------------+
|[[[manual0], [manual1], [manual2], [manual3]], [[[[, 0]]], [[[, 1]]], [[[, 2]]], [[[, 3]]]]]|
|[[[manual0], [manual1], [manual2]], [[[[, 0]]], [[[, 1]]], [[[, 2]]]]]                      |
|[[[manual0], [manual1], [manual2]], [[[[, 0]]], [[[, 1]]], [[[, 2]]]]]                      |
+--------------------------------------------------------------------------------------------+

转换之后,问题是让Dataframe看起来像这样:

+------------------------------------------------------------------------------------------------------------------------+
|_c                                                                                                                      |
+------------------------------------------------------------------------------------------------------------------------+
|[[[manual0], [manual1], [manual2], [manual3]], [[[[manual0, 0]]], [[[manual1, 1]]], [[[manual2, 2]]], [[[manual3, 3]]]]]|
|[[[manual0], [manual1], [manual2]], [[[[manual0, 0]]], [[[manual1, 1]]], [[[manual2, 2]]]]]                             |
|[[[manual0], [manual1], [manual2]], [[[[manual0, 0]]], [[[manual1, 1]]], [[[manual2, 2]]]]]                             |
+------------------------------------------------------------------------------------------------------------------------+
3vpjnl9f

3vpjnl9f1#

把你的脑袋弄清楚然后做 transform :)

import pyspark.sql.functions as F

df2 = df.withColumn(
    '_c', 
    F.expr("""
        struct(
            _c.discount_applications,
            transform(
                _c.line_items,
                x -> struct(
                    transform(
                        x.discount_allocations,
                        y -> struct(
                            _c.discount_applications[int(y.discount_application_index)].type as application_type,
                            y.discount_application_index as discount_application_index
                        )
                    ) as discount_allocations
                )
            ) as line_items
        )
    """)
)

df2.show(truncate=False)
+------------------------------------------------------------------------------------------------------------------------+
|_c                                                                                                                      |
+------------------------------------------------------------------------------------------------------------------------+
|[[[manual0], [manual1], [manual2], [manual3]], [[[[manual0, 0]]], [[[manual1, 1]]], [[[manual2, 2]]], [[[manual3, 3]]]]]|
|[[[manual0], [manual1], [manual2]], [[[[manual0, 0]]], [[[manual1, 1]]], [[[manual2, 2]]]]]                             |
|[[[manual0], [manual1], [manual2]], [[[[manual0, 0]]], [[[manual1, 1]]], [[[manual2, 2]]]]]                             |
+------------------------------------------------------------------------------------------------------------------------+

相关问题