如何为具有许多列的sparkDataframe定义模式

cig3rfwq  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(328)

我有Spark in_df 有300多列,其中一列是字符串,其余的是双列。我需要在其上运行groupedmap和udf,并在运行之前定义输出的模式。在输出的列数应该相同但类型不同的情况下,如何定义该模式?我能找到的几个Pandas自定义项示例通常只使用 in 作为输出模式。
我见过的一种方法 withColumn 以及 cast()in_df . 这是最佳做法吗?如果我想我的输出是一个完全不同的形状比 in_df 但是有太多的列需要手工编码吗?我还没找到合适的资源。

cidc1ykv

cidc1ykv1#

乌辛 pyspark.sql.types.StructType.fromJson() 您可以从json动态构造模式。
根据您的要求,我更改了用于“col\e”的数据类型,您可以根据您的用例将数据类型更改为一列或多列。

df = spark.read.csv('test.csv',header=True,inferSchema=True)
fields = []
for f in json.loads(df.schema.json())["fields"]:
    if f["name"] == "col_e":
        fields.append(StructField("col_e", StringType(), True))
    else:
        fields.append(StructField.fromJson(f))

schema = StructType(fields)

@F.pandas_udf(schema, F.PandasUDFType.GROUPED_MAP)
def many_cols_data(pdf):
    pdf['col_e'] = "test"
    return pdf

df.groupBy(
    'col_a'
).apply(
    many_cols_data
).show()

输入文件test.csv

col_a,col_b,col_c,col_d,col_e
a,2,3,4,5
b,2,3,4,5
c,2,3,4,5

结果是什么

+-----+-----+-----+-----+-----+
|col_a|col_b|col_c|col_d|col_e|
+-----+-----+-----+-----+-----+
|    c|    2|    3|    4| test|
|    b|    2|    3|    4| test|
|    a|    2|    3|    4| test|
+-----+-----+-----+-----+-----+

相关问题