在Pyspark中将词典列表转换为json

t8e9dugd  于 2022-10-07  发布在  Spark
关注(0)|答案(1)|浏览(216)

我有 Dataframe ,我得到列deskr作为字典列表:

df = spark.createDataFrame(
    [
        (2022,'A1', "cat", 'eng', 3, 56.768639), 
         (2022,'A1', "rabbit", 'eng', 10, 56.768639), 
         (2022, 'A2', "dog", 'eng', 10, 54.114841),
           (2022, 'A2', "mouse", 'eng', 20, 81.114841),
    ],
    ["data",'group', "word", 'lang', 'count', 'value']  # add your column names here
)
df2 = df
    .groupBy('data', 'group', 'lang')
    .agg(F.collect_list(F.to_json(F.struct(F.col('count'), F.col('value'), F.col('word')))).alias('descr'))

我想用PANDAS_UDF将DICT列表转换为JSON字符串:

@pandas_udf(StringType())
def jsn(lst):
    return lst.apply(lambda lst: base64.b64encode(gzip.compress(json.dumps(lst).encode('utf-8'))).decode("utf-8"))

df3= df2.withColumn('descr2',
                        jsn(F.col('descr')))

但我弄错了:

TypeError:ndarray类型的对象不可JSON序列化

xpcnnkqh

xpcnnkqh1#

您传递的是JSON对象数组,而不是JSON字符串。试着这样做:

.agg(
    F.to_json(
        F.collect_list(F.struct(F.col('count'), F.col('value'), F.col('word')))
    ).alias('descr')
)

您需要对收集的结构列表应用to_json函数。

相关问题