我有两个数据集,包含以下数据:
给定源文件中的interchange_rate值,我需要从查找表中获取与rebate_rate最接近的匹配。在图中,您可以看到transaction_id 2具有最高的interchange_rate,因此,我需要获得最高的rebate_rate; transaction_id 1是最低的interchange_rate,因此得到最低的rebate_rate。
我使用Excel手动创建的红色列只是作为示例显示,但这是预期的输出。
我最初的想法是循环源文件中的行,并在查找表中为每行搜索最接近的匹配。但我不是一个经验丰富的PySpark开发人员。我正在寻找帮助编写代码来完成这项任务。
我的第一次尝试是在源文件dataframe中使用foreach()方法,但我得到了一个PicklingError:无法序列化对象:TypeError:无法pickle“_thread.RLock”对象
def get_rebate(row):
buyer_name = row["buyer_name"]
df_buyer = df_lookup.where(f"buyer_name == '{buyer_name}'")
row["rebate_rate"] = df_buyer.select("rebate_rate").first()
return row
# df_final is the source file after a few cosmetics transformations. I need to add a new column "rebate_rate" to it
df_final.foreach(lambda x: get_rebate(x))
1条答案
按热度按时间u5rb5r591#
使用这个+
groupby()
应该可以解决您的问题(这篇文章Find nearest value in numpy array提供了find_nearest函数)