我尝试使用PySpark执行以下代码:
join_on = (df_1.C1_PROFIT == df_2.C2_PROFIT) & \ # JOIN CONDITION
(df_1.C1_REVENUE == df_3.C3_REVENUE_BREAK) & \ # JOIN CONDITION
(df_1.C1_LOSS == df_4.C4_TOTAL_LOSS) & \ # JOIN CONDITION
((df_4.TOTAL_YEAR_PROFIT) > (df_3.TOTAL_GROWTH)) # WHERE CONDITION
df = (df_1.alias('a')
.join(df_2.alias('b'), join_on, 'left')
.join(df_3.alias('c'), join_on, 'left')
.join(df_4.alias('d'), join_on, 'left')
.select(
*[c for c in df_2.columns if c != 'C2_TARGET'],
F.expr("nvl2(b.C2_PROFIT, '500', a.C2_TARGET) C2_TARGET")
)
)
运行查询后出错:
在df_1列中不存在'年利润合计'、'增长合计'、'亏损合计'和'收入突破':
原始SQL查询:
UPDATE (( companyc1
INNER JOIN companyc2
ON company1.c1_profit = company2.c2_profit)
INNER JOIN companyc3
ON company1.c1_revenue = company3.revenue_break)
INNER JOIN companyc4
ON company1.c1_loss = company4.c4_total_loss
SET companyc1.sales = "500"
WHERE (( ( company4.total_year_profit ) > [company3].[total_growth] ))
有谁能帮我找出我在哪里犯了错误吗?
2条答案
按热度按时间wwtsj6pe1#
对于每个
join
操作,必须拆分join_on
条件,如下所示:x8diyxa72#
在翻译包含多个连接的SQL
UPDATE
时,在我看来,普遍安全的方法可能涉及groupBy
、agg
和monotonically_increasing_id
(以确保原始df的行号在聚合后不会缩小)。我在MS Access中制作了以下表格,以确保我建议的方法在Spark中也能以同样的方式工作。
输入:
更新后的结果:
指令集
"Spark"
MS Access似乎聚合了列值,因此下面的代码也将这样做。
输入:
脚本: