我有这个dataframe:
+------+-------------------+-----------+------------------------+------------------------+
|brand |original_timestamp |weight |arrival_timestamp |features |
+------+-------------------+-----------+------------------------+------------------------+
|BR1 |1632899456 |4.0 |2023-08-09 17:12:24.002 |{f1 -> 12, f2 -> 52} |
|BR1 |1632899456 |4.0 |2023-08-09 17:14:24.002 |{f1 -> 42, f2 -> 12} |
|BR1 |1632899456 |2.0 |2023-08-09 17:46:24.002 |{f1 -> 42, f2 -> 12} |
|BR3 |1632899155 |2.0 |2023-08-09 17:14:24.002 |{f1 -> 72, f2 -> 50} |
|BR3 |1632899155 |9.0 |2023-08-09 17:20:24.002 |{f1 -> 72, f2 -> 50} |
我想获取以下各项组合的最新“arrival_timestamp”:brand,original_timestamp and features.这是我使用的代码:
import org.apache.spark.sql.functions._
val windowSpec = Window.partitionBy("brand","original_timestamp","features").orderBy(col("arrival_timestamp").desc)
df.withColumn("maxTS", first("arrival_timestamp").over(windowSpec))
.select("*").where(col("maxTS") === col("arrival_timestamp"))
.drop("maxTS")
.show(false)
这是我期待的输出:
+------+-------------------+-----------+------------------------+------------------------+
|brand |original_timestamp |weight |arrival_timestamp |features |
+------+-------------------+-----------+------------------------+------------------------+
|BR1 |1632899456 |4.0 |2023-08-09 17:12:24.002 |{f1 -> 12, f2 -> 52} |
|BR1 |1632899456 |2.0 |2023-08-09 17:46:24.002 |{f1 -> 42, f2 -> 12} |
|BR3 |1632899155 |9.0 |2023-08-09 17:20:24.002 |{f1 -> 72, f2 -> 50} |
但是,我犯了这样的错误:
分组/联接/窗口分区键不能为map类型。
java.lang.IllegalStateException:grouping/join/window分区键不能是map类型。
这是因为柱状特征是一张Map。
有没有其他的方法来进行分组?
我使用Spark 3.2.2版和SQLContext,使用scala语言。
编辑:这是在Spark结构流的背景下
2条答案
按热度按时间ejk8hzay1#
使用
to_json
函数将map
类型转换为某个string
类型,并将其传递给Window.partitionBy
函数。检查以下样品溶液。brqmpdu12#
要知道to_json有两个明显的功能缺陷:
1.它不对键进行排序(不存在option来进行排序)
1.它只支持字符串键(JSON限制,但似乎与这个问题无关)
它还带来了性能影响,尽管在这种情况下不太可能注意到,因为您可能会忽略之后的结果。
为什么密钥顺序很重要?
产量:
虽然Map是相同的,但上面的列相同是假的,因为json表示不相同。
在你的例子中,先sort via the keys再to_json似乎足够了,但是如果有Map的嵌套或者你把字符串键换成了其他类型,你就需要另一种解决方案了。
为了添加通用比较、分组依据等。对于maps,无论嵌套有多深,我都向Quality添加了comparable_maps。一个已知的限制是类型中的间隔也不具有可比性(并且在不同的Spark版本上具有不同的表示)。
产量: