我正在pyspark中处理一个Dataframe并使用 asyncio
在上面Map两个不同的函数。
比如说Dataframe( x_df
)看起来像这样:
速度卷26.023432.0123
第一个函数,我们称之为 a()
,应用于此Dataframe时,提供:
speedvolumemodel\ U版本26.0234v1.0.032.0123v1.0.1
我称这个结果为 a_df
.
第二个函数,让我们调用它 b()
,应用于此Dataframe时,提供:
速度卷模型类型26.0234svm32.0123nn
我称这个结果为 b_df
.
我想把这两个Dataframe合并成一个,这样我的最终结果是:
speedvolumemodel\类型Model\版本26.0234svmv1.0.032.0123nnv1.0.1
我这样做是因为:
schema_fields = list(a_df.schema.fields) + list(b_df.schema.fields)
schema = StructType(schema_fields)
merged_df = a_df.rdd.zip(b_df.rdd).map(lambda x: x[0] + x[1])
现在,当我检查 merged_df
与 spark.createDataFrame(merged_df, schema).show()
,我看到:这些列:
speedvolumemodel\类型speedvolumemodel\版本
如何消除重复数据 speed
& volume
柱?我在躲避 join
因为我的Dataframe很大,有很多公共列(超过10个),而且其中一些具有复杂类型,而不仅仅是整数或字符串。
我用asyncio来运行函数 a()
以及 b()
同时,我不想改变这一点,因为这是为了获得一些性能提升而故意的。
1条答案
按热度按时间q8l4jmvw1#
保持核心列分开,让a和b只返回新列:
举个简单的例子:
a(df)输出:
简单b示例:
输出b(df):
简单测试数据框:
输出:
zip和Map功能:
结果:
输出: