我有这段代码(Spark 1.6),其计算窗口分区列上的和:
val sconf = new SparkConf()
.setAppName("TestPonderacion")
.setMaster("local[*]")
var sc = new SparkContext(sconf)
val sqlContext = new HiveContext(sc)
val schema = StructType(List(
StructField("CLIENT", IntegerType, true),
StructField("FAMILY", StringType, true),
StructField("MOV_DATE", StringType, true),
StructField("IMP_PONDERACION", DoubleType, true)
))
val data = List(
Row(20898871, "VAUL 10", "2023-01-01 00:00:00", 25.00),
Row(20898871, "VAUL 10", "2023-02-03 00:00:00", 25.00),
Row(20898871, "VAUL 10", "2023-02-04 00:00:00", 1250.00),
Row(20898871, "VAUL 10", "2023-03-01 00:00:00", -750.00),
Row(20898871, "VAUL 10", "2023-03-02 00:00:00", 25.00),
Row(20898871, "VAUL 10", "2023-03-03 00:00:00", 25.00),
Row(20898871, "VAUL 10", "2023-04-01 00:00:00", -750.00),
Row(20898871, "VAUL 10", "2023-04-02 00:00:00", 25.00),
Row(20898871, "VAUL 10", "2023-04-03 00:00:00", 25.00)
)
val mov = sqlContext.createDataFrame(sc.parallelize(data), schema)
val movPonderados = mov.withColumn("CUMULATIVE_SUM", sum("IMP_PONDERACION").over(Window.partitionBy("FAMILY", "CLIENT").orderBy("MOV_DATE")).cast(DecimalType(17, 2)))
movPonderados.printSchema()
movPonderados.show(false)
结果是
|CLIENT |FAMILY |MOV_DATE |IMP_PONDERACION|CUMULATIVE_SUM|
+--------+-------+-------------------+---------------+--------------+
|20898871|VAUL 10|2023-01-01 00:00:00|25.0 |25.00 |
|20898871|VAUL 10|2023-02-03 00:00:00|25.0 |50.00 |
|20898871|VAUL 10|2023-02-04 00:00:00|1250.0 |1300.00 |
|20898871|VAUL 10|2023-03-01 00:00:00|-750.0 |550.00 |
|20898871|VAUL 10|2023-03-02 00:00:00|25.0 |575.00 |
|20898871|VAUL 10|2023-03-03 00:00:00|25.0 |600.00 |
|20898871|VAUL 10|2023-04-01 00:00:00|-750.0 |-150.00 |
|20898871|VAUL 10|2023-04-02 00:00:00|25.0 |-125.00 |
|20898871|VAUL 10|2023-04-03 00:00:00|25.0 |-100.00 |
+--------+-------+-------------------+---------------+--------------+
但我想要的是将中间结果重置为零,如果变为负数:
|CLIENT |FAMILY |MOV_DATE |IMP_PONDERACION|CUMULATIVE_SUM|
+--------+-------+-------------------+---------------+--------------+
|20898871|VAUL 10|2023-01-01 00:00:00|25.0 |25.00 |
|20898871|VAUL 10|2023-02-03 00:00:00|25.0 |50.00 |
|20898871|VAUL 10|2023-02-04 00:00:00|1250.0 |1300.00 |
|20898871|VAUL 10|2023-03-01 00:00:00|-750.0 |550.00 |
|20898871|VAUL 10|2023-03-02 00:00:00|25.0 |575.00 |
|20898871|VAUL 10|2023-03-03 00:00:00|25.0 |600.00 |
|20898871|VAUL 10|2023-04-01 00:00:00|-750.0 |0.00 | <- Reset to 0 since 600-750 is negative
|20898871|VAUL 10|2023-04-02 00:00:00|25.0 |25.00 | <- Start acumulating again from 0
|20898871|VAUL 10|2023-04-03 00:00:00|25.0 |50.00 |
+--------+-------+-------------------+---------------+--------------+
我找到的所有解决方案都适用于Spark 2。X,如:Cumulative Sum with Reset BEFORE Negative in Pyspark
但我需要解决Spark1号的问题6.有人能给予我一把吗?
先谢谢你了
1条答案
按热度按时间wwwo4jvm1#
对于可能感兴趣的人,我最终通过用以下代码替换窗口函数来解决它: