scala 获取具有最新时间戳的dataframe行(Spark结构化流)

hlswsv35  于 2023-10-18  发布在  Scala
关注(0)|答案(2)|浏览(149)

我有这个dataframe:

+------+-------------------+-----------+------------------------+------------------------+
|brand |original_timestamp |weight     |arrival_timestamp       |features                |
+------+-------------------+-----------+------------------------+------------------------+
|BR1   |1632899456         |4.0        |2023-08-09 17:12:24.002 |{f1 -> 12, f2 -> 52}    |
|BR1   |1632899456         |4.0        |2023-08-09 17:14:24.002 |{f1 -> 42, f2 -> 12}    |
|BR1   |1632899456         |2.0        |2023-08-09 17:46:24.002 |{f1 -> 42, f2 -> 12}    |
|BR3   |1632899155         |2.0        |2023-08-09 17:14:24.002 |{f1 -> 72, f2 -> 50}    |
|BR3   |1632899155         |9.0        |2023-08-09 17:20:24.002 |{f1 -> 72, f2 -> 50}    |

我想获取以下各项组合的最新“arrival_timestamp”:brand,original_timestamp and features.这是我使用的代码:

import org.apache.spark.sql.functions._
val windowSpec = Window.partitionBy("brand","original_timestamp","features").orderBy(col("arrival_timestamp").desc)

df.withColumn("maxTS", first("arrival_timestamp").over(windowSpec))
  .select("*").where(col("maxTS") === col("arrival_timestamp"))
  .drop("maxTS")
  .show(false)

这是我期待的输出:

+------+-------------------+-----------+------------------------+------------------------+
|brand |original_timestamp |weight     |arrival_timestamp       |features                |
+------+-------------------+-----------+------------------------+------------------------+
|BR1   |1632899456         |4.0        |2023-08-09 17:12:24.002 |{f1 -> 12, f2 -> 52}    |

|BR1   |1632899456         |2.0        |2023-08-09 17:46:24.002 |{f1 -> 42, f2 -> 12}    |

|BR3   |1632899155         |9.0        |2023-08-09 17:20:24.002 |{f1 -> 72, f2 -> 50}    |

但是,我犯了这样的错误:

分组/联接/窗口分区键不能为map类型。
java.lang.IllegalStateException:grouping/join/window分区键不能是map类型。

这是因为柱状特征是一张Map。
有没有其他的方法来进行分组?
我使用Spark 3.2.2版和SQLContext,使用scala语言。

编辑这是在Spark结构流的背景下

ejk8hzay

ejk8hzay1#

使用to_json函数将map类型转换为某个string类型,并将其传递给Window.partitionBy函数。检查以下样品溶液。

scala> mdf.show(false)
+-----+------------------+------+-----------------------+--------------------+
|brand|original_timestamp|weight|arrival_timestamp      |features            |
+-----+------------------+------+-----------------------+--------------------+
|BR1  |1632899456        |4.0   |2023-08-09 17:12:24.002|{f1 -> 12, f2 -> 52}|
|BR1  |1632899456        |4.0   |2023-08-09 17:14:24.002|{f1 -> 42, f2 -> 12}|
|BR1  |1632899456        |2.0   |2023-08-09 17:46:24.002|{f1 -> 42, f2 -> 12}|
|BR3  |1632899155        |2.0   |2023-08-09 17:14:24.002|{f1 -> 72, f2 -> 50}|
|BR3  |1632899155        |9.0   |2023-08-09 17:20:24.002|{f1 -> 72, f2 -> 50}|
+-----+------------------+------+-----------------------+--------------------+
scala> import org.apache.spark.sql.expressions._
import org.apache.spark.sql.expressions._

scala> val windowSpec = Window.partitionBy($"brand",$"original_timestamp",to_json($"features").as("features")).orderBy(col("arrival_timestamp").desc)
windowSpec: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@10b34165

scala> mdf.withColumn("maxTs", first($"arrival_timestamp").over(windowSpec)).where($"maxTs" === $"arrival_timestamp").drop("maxTs").show(false)
+-----+------------------+------+-----------------------+--------------------+
|brand|original_timestamp|weight|arrival_timestamp      |features            |
+-----+------------------+------+-----------------------+--------------------+
|BR1  |1632899456        |4.0   |2023-08-09 17:12:24.002|{f1 -> 12, f2 -> 52}|
|BR1  |1632899456        |2.0   |2023-08-09 17:46:24.002|{f1 -> 42, f2 -> 12}|
|BR3  |1632899155        |9.0   |2023-08-09 17:20:24.002|{f1 -> 72, f2 -> 50}|
+-----+------------------+------+-----------------------+--------------------+
brqmpdu1

brqmpdu12#

要知道to_json有两个明显的功能缺陷:
1.它不对键进行排序(不存在option来进行排序)
1.它只支持字符串键(JSON限制,但似乎与这个问题无关)
它还带来了性能影响,尽管在这种情况下不太可能注意到,因为您可能会忽略之后的结果。
为什么密钥顺序很重要?

spark.sql("select map('b', 'b', 'a', 'a') amap, map('a', 'a', 'b', 'b') bmap").
      selectExpr("*", "to_json(amap) jamap", "to_json(bmap) jbmap").
      selectExpr("*", "(jamap = jbmap) same").
      show

产量:

+----------------+----------------+-----------------+-----------------+-----+
|            amap|            bmap|            jamap|            jbmap| same|
+----------------+----------------+-----------------+-----------------+-----+
|{b -> b, a -> a}|{a -> a, b -> b}|{"b":"b","a":"a"}|{"a":"a","b":"b"}|false|
+----------------+----------------+-----------------+-----------------+-----+

虽然Map是相同的,但上面的列相同是假的,因为json表示不相同。
在你的例子中,先sort via the keys再to_json似乎足够了,但是如果有Map的嵌套或者你把字符串键换成了其他类型,你就需要另一种解决方案了。
为了添加通用比较、分组依据等。对于maps,无论嵌套有多深,我都向Quality添加了comparable_maps。一个已知的限制是类型中的间隔也不具有可比性(并且在不同的Spark版本上具有不同的表示)。

com.sparkutils.quality.registerQualityFunctions()
sparkSession.sql("select map('b', 'b', 'a', 'a') amap, map('a', 'a', 'b', 'b') bmap").
      selectExpr("*", "comparable_maps(amap) camap", "comparable_maps(bmap) cbmap").
      selectExpr("*", "(camap = cbmap) same").
      show

产量:

+----------------+----------------+----------------+----------------+----+
|            amap|            bmap|           camap|           cbmap|same|
+----------------+----------------+----------------+----------------+----+
|{b -> b, a -> a}|{a -> a, b -> b}|[{a, a}, {b, b}]|[{a, a}, {b, b}]|true|
+----------------+----------------+----------------+----------------+----+

相关问题