如果行与另一个Dataframe中的id匹配,并且timestamp低于其他帧timestamp,那么如何过滤scala sparkDataframe

kgsdhlau  于 2021-05-29  发布在  Spark
关注(0)|答案(2)|浏览(299)

我想根据消息事件的Dataframe中的条目被编辑的时间来过滤它们。我有一个包含消息事件的Dataframe,另一个Dataframe表示何时/是否对它们进行了编辑。如果消息表中的行在已编辑的表中具有匹配索引,并且消息表中的时间戳低于相应的编辑事件,则删除操作应删除消息表中的行。
编辑的Dataframe是:

+----------+-------------------+
| timestamp|index              |
+----------+-------------------+
|1556247980|                 78|
|1558144430|                 87|
|1549964820|                 99|
+----------+-------------------+

消息Dataframe是:

+-------------------+--------------------+------------------+--------------------+
|index              |  commonResponseText|publishedTimestamp|  commonResponseText|
+-------------------+--------------------+------------------+--------------------+
|                 78|Voluptatem enim a...|        1556247974|Voluptatem enim a...|
|                 87|Ut enim enim sunt...|        1558144420|Ut enim enim sunt...|
|                 99|Et est perferendi...|        1549964815|Et est perferendi...|
|                 78|Voluptatem porro ...|        1556248000|Voluptatem porro ...|
|                 87|Atque quod est au...|        1549965000|Atque quod est au...|
+-------------------+--------------------+------------------+--------------------+

我希望结果是:

+-------------------+--------------------+------------------+--------------------+
|commonResponseIndex|  index             |publishedTimestamp|  commonResponseText|
+-------------------+--------------------+------------------+--------------------+
|                 78|Voluptatem porro ...|        1556248000|Voluptatem porro ...|
|                 87|Atque quod est au...|        1549965000|Atque quod est au...|
+-------------------+--------------------+------------------+--------------------+

谢谢你的帮助!

xu3bshqb

xu3bshqb1#

我最后做的是:

val editedDF = Seq(("A",3),("B",3)).toDF("id","timestamp")
val messageDF = Seq(("A",2),("B",2),("A",2),("A",3),("B",4),("A",2),("B",2),("c",9)).toDF("id","timestamp")

最后我使用了这个连接:

// Filter out the edited meesages.
    val editedFilteredDF  = messageDF.join(editedDF,
(editedDF("id") === messageDF("id")) && (editedDF("timestamp") > messageDF("timestamp")),
joinType="left_anti")

结果是:

editedFilteredDF.show()
+---+---------+
| id|timestamp|
+---+---------+
|  A|        3|
|  B|        4|
|  c|        9|
+---+---------+
bweufnob

bweufnob2#

您可以聚合消息表,将其与已编辑的表联接并筛选

import pyspark.sql.functions as F

# Test dataframe

tst=sqlContext.createDataFrame([('A',2),('B',2),('A',2),('A',3),('B',4),('A',2),('B',2),('c',9)],schema=("id","count"))
tst1 = sqlContext.createDataFrame([('A',4),('B',1)],schema=("id","val"))

# Aggregate and join

tst_g=tst.groupby('id').agg(F.max('count').alias('count'))
tst_j= tst_g.join(tst1,tst_g.id==tst1.id,'left')

# Filter result

tst_f = tst_j.where((F.col('count')>=F.col('val'))|(F.col('val').isNull()))

结果是:

tst_j.show()

+---+-----+----+----+
| id|count|  id| val|
+---+-----+----+----+
|  c|    9|null|null|
|  B|    4|   B|   1|
|  A|    3|   A|   4|
+---+-----+----+----+
 tst_f.show()
+---+-----+----+----+
| id|count|  id| val|
+---+-----+----+----+
|  c|    9|null|null|
|  B|    4|   B|   1|
+---+-----+----+----+

最后,可以删除不相关的列。
如果需要完整的数据,那么可以将update表与message表连接起来,并执行相同的操作。如果更新表很小,那么出于性能原因考虑使用广播连接。


# Approach to join with full table

# Test dataframe

tst=sqlContext.createDataFrame([('A',2),('B',2),('A',2),('A',3),('B',4),('A',2),('B',2),('c',9)],schema=("id","count"))
tst1 = sqlContext.createDataFrame([('A',4),('B',1)],schema=("id","val"))

# %%

# join with the full table

tst_j= tst.join(tst1,tst.id==tst1.id,'left')

# Filter result

tst_f = tst_j.where((F.col('count')>=F.col('val'))|(F.col('val').isNull()))

提示:如果不想在结果中包含两个id列,可以将join语法更改为tst.join(tst1,on=“id”,how='left')

相关问题