PySpark在GroupBy之后加入

owfi6suc  于 2023-05-06  发布在  Spark
关注(0)|答案(2)|浏览(169)

我有两个dataframes,我想做的是按组/分区加入它们。如何在PySpark中实现?
第一个df包含由id、时间戳和值标识的3个时间序列。注意到时间序列包含一些间隙(缺失天数)

第二个df包含一个没有间隔的时间序列

我想达到的结果是

ercv8c1e

ercv8c1e1#

第二个df上的Left join和**coalesce**将适用于这种情况。

Example:

df.show()
#---+--------+-----+
#tag|      ts|value|
#---+--------+-----+
#  a|01-01-19|   45|
#  a|03-01-19|   89|
#  a|04-01-19|   24|
#  a|05-01-19|  778|
#---+--------+-----+

df1.show()
#+--------+
#|      ts|
#+--------+
#|01-01-19|
#|02-01-19|
#|03-01-19|
#|04-01-19|
#|05-01-19|
#+--------+

df1.alias("t1").join(df.alias("t2"),col("t1.ts")==col("t2.ts"),"left").\
selectExpr("coalesce(t1.ts,t2.ts) as ts","tag","value").\
orderBy("ts").\
show()

#+--------+----+-----+
#|      ts| tag|value|
#+--------+----+-----+
#|01-01-19|   a|   45|
#|02-01-19|null| null|
#|03-01-19|   a|   89|
#|04-01-19|   a|   24|
#|05-01-19|   a|  778|
#+--------+----+-----+
mi7gmzs6

mi7gmzs62#

我会这样做:

df2 = (
    df2
    .withColumn("tag", F.array([F.lit("a"), F.lit("b"), F.lit("c")]))
    .withColumn("ts", F.explode("ts"))
)

df_out = df1.join(df2, ["tag","ts"], "outer")

如果你想有一个动态生成的标签列表,你可以从df1创建。

相关问题