pyspark dataframe json列分解

b4lqfgs4  于 2021-07-24  发布在  Java
关注(0)|答案(1)|浏览(376)

我试图在pysparkDataframe中分解一个json列。
这与pyspark dataframe with json column的问题类似,将json元素聚合到一个新列中并删除重复的元素
但是这个新的json列有更复杂的结构。
Dataframe

year month id json_col
 2010  08   5  {"my_p": [{"like": false, "p_id": "dfvefvsd"}, {"like": true, "p_id": "dvcdc"}], "p_id": "cdscas"}

我需要一个新的颜色:

year month id like  p_id
2010  8    5  false dfvefvsd
2010  8    5  true  dvcdc
2010  8    5  null  cdscas

如果在同一年、月、id中存在重复的p\ U id,请将其删除。
从上面链接中学习的代码(归功于@shu)

from pyspark.sql import functions as F
from pyspark.sql.types import *

t = spark.sql('select * from my_db.my_tab')

schema = ArrayType(
                StructType(
                  [
                    StructField('my_p', 
                                        StructType(
                                                  [StructField('p_id', StringType(), True),
                                                  StructField('like', BooleanType(), True)
                                                  ]
                                        ),
                               True), 
                   StructField('p_id', StringType(), True)
                  ]

                )
            )

   t1 = t.withColumn('a_col', F.from_json('json_col', schema)).select('year', 'month', 'id', 'p_id', F.expr('transform(json_col, f -> f.p_id)').alias('tmp'))

   t1.groupBy("year","month", 'id', 'p_id').agg(F.to_json(F.array_distinct(F.flatten(F.collect_list(F.col("tmp"))))).alias("new_col")).show(10,False)

但是,只有第一个“p\u id”是从json\u列分解出来的。
谢谢

rkttyhzu

rkttyhzu1#

试试这个

t.show()

# +----+-----+---+--------------------------------------------------------------------------------------------------+

# |year|month|id |json_col                                                                                          |

# +----+-----+---+--------------------------------------------------------------------------------------------------+

# |2010|08   |5  |{"my_p": [{"like": false, "p_id": "dfvefvsd"}, {"like": true, "p_id": "dvcdc"}], "p_id": "cdscas"}|

# +----+-----+---+--------------------------------------------------------------------------------------------------+

from pyspark.sql import functions as F
from pyspark.sql.window import Window

schema1='struct<my_p:array<struct<like:boolean,p_id:string>>,p_id:string>'

w=Window().partitionBy("p_id2").orderBy(F.lit(0))

t.withColumn("json_col", F.from_json("json_col",schema1))\
  .select("*","json_col.*").drop("json_col")\
  .withColumnRenamed("p_id","p_id2").select("*",F.expr("""inline(my_p)""")).drop("my_p")\
  .withColumn('num', F.row_number().over(w)).withColumn("p_id", F.when(F.col("num")==1, F.array("p_id2","p_id"))\
                                                                .otherwise(F.array("p_id"))).drop("num","p_id2")\
  .withColumn("p_id", F.explode("p_id")).show()

# +----+-----+---+-----+--------+

# |year|month| id| like|    p_id|

# +----+-----+---+-----+--------+

# |2010|   08|  5| true|   dvcdc|

# |2010|   08|  5|false|dfvefvsd|

# |2010|   08|  5|false|  cdscas|

# +----+-----+---+-----+--------+

相关问题