不使用join从spark dataframe中删除重复列

iq3niunx  于 2021-07-09  发布在  Spark
关注(0)|答案(1)|浏览(389)

我正在pyspark中处理一个Dataframe并使用 asyncio 在上面Map两个不同的函数。
比如说Dataframe( x_df )看起来像这样:
速度卷26.023432.0123
第一个函数,我们称之为 a() ,应用于此Dataframe时,提供:
speedvolumemodel\ U版本26.0234v1.0.032.0123v1.0.1
我称这个结果为 a_df .
第二个函数,让我们调用它 b() ,应用于此Dataframe时,提供:
速度卷模型类型26.0234svm32.0123nn
我称这个结果为 b_df .
我想把这两个Dataframe合并成一个,这样我的最终结果是:
speedvolumemodel\类型Model\版本26.0234svmv1.0.032.0123nnv1.0.1
我这样做是因为:

schema_fields = list(a_df.schema.fields) + list(b_df.schema.fields)
schema = StructType(schema_fields)
merged_df = a_df.rdd.zip(b_df.rdd).map(lambda x: x[0] + x[1])

现在,当我检查 merged_dfspark.createDataFrame(merged_df, schema).show() ,我看到:这些列:
speedvolumemodel\类型speedvolumemodel\版本
如何消除重复数据 speed & volume 柱?我在躲避 join 因为我的Dataframe很大,有很多公共列(超过10个),而且其中一些具有复杂类型,而不仅仅是整数或字符串。
我用asyncio来运行函数 a() 以及 b() 同时,我不想改变这一点,因为这是为了获得一些性能提升而故意的。

q8l4jmvw

q8l4jmvw1#

保持核心列分开,让a和b只返回新列:
举个简单的例子:

def a(df): 
    return df.select(
        when(col('speed') % 2 == 0, 'svm').otherwise('nn').alias('model_type')
    )

a(df)输出:

+----------+
|model_type|
+----------+
|        nn|
|       svm|
|        nn|
|       svm|
|        nn|
+----------+

简单b示例:

def b(df): 
    return df.select(
        when(col('speed') % 2 == 0, 'v1.0.1').otherwise('v1.0.0').alias('model_version')
    )

输出b(df):

+-------------+
|model_version|
+-------------+
|       v1.0.0|
|       v1.0.1|
|       v1.0.0|
|       v1.0.1|
|       v1.0.0|
+-------------+

简单测试数据框:

df = spark.createDataFrame([{"speed": 1, "volume": 1}, 
                            {"speed": 2, "volume": 1}, 
                            {"speed": 3, "volume": 1}, 
                            {"speed": 4, "volume": 1}, 
                            {"speed": 5, "volume": 1}])

输出:

+-----+------+
|speed|volume|
+-----+------+
|    1|     1|
|    2|     1|
|    3|     1|
|    4|     1|
|    5|     1|
+-----+------+

zip和Map功能:

def merge(df1, df2): 
    schema = StructType([*df1.schema, *df2.schema]) 
    return spark.createDataFrame(df1.rdd.zip(df2.rdd).map(lambda x: x[0] + x[1]), schema)

结果:

result_df = merge(
    # Original DF with core columns
    df, 
    merge(
        # df with only model_type
        a(df),
        # df with only model_version
        b(df)
    )
)

输出:

+-----+------+----------+-------------+
|speed|volume|model_type|model_version|
+-----+------+----------+-------------+
|    1|     1|        nn|       v1.0.0|
|    2|     1|       svm|       v1.0.1|
|    3|     1|        nn|       v1.0.0|
|    4|     1|       svm|       v1.0.1|
|    5|     1|        nn|       v1.0.0|
+-----+------+----------+-------------+

相关问题