我使用以下代码:
Dataset <Row> dataframee = df1.as("a").join(df2.as("b"),
df2.col("id_device").equalTo(df1.col("ID_device_previous")).
and(df2.col("id_vehicule").equalTo(df1.col("ID_vehicule_previous"))).
and(df2.col("tracking_time").lt(df1.col("date_track_previous")))
,"left").selectExpr("a.*", "b.ID_tracking as ID_pprevious", "b.km as KM_pprevious","b.tracking_time as tracking_time_pprevious","b.speed as speed_pprevious");
我从df2Dataframe获得了df1Dataframe与多行的连接。
但我想加入 df1
Dataframe df2
相同条件和顺序下的Dataframe df2.col("tracking_time") desc limit(0,1)
编辑
我尝试了下面的代码,但不起作用。
df1.registerTempTable("data");
df2.createOrReplaceTempView("tdays");
Dataset<Row> d_f = sparkSession.sql("select a.* from data as a LEFT JOIN (select b.tracking_time from tdays as b where b.id_device = a.ID_device_previous and b.id_vehicule = a.ID_vehicule_previous and b.tracking_time < a.date_track_previous order by b.tracking_time desc limit 1 )");
我需要你的帮助
1条答案
按热度按时间kpbwa7wx1#
我知道你可以用多种方法来做
您可以对加入的dataframee df执行dropduplicates操作。
val finaldf=dataframee.dropduplicates(“”)//指定要在最终输出中不同/唯一的列
(或)
Sparksql