pyspark 为什么Window函数失败,并显示“Window function X does not take a frame specification”?

2ledvvac  于 2023-10-15  发布在  Spark
关注(0)|答案(2)|浏览(137)

我正在尝试在pyspark 1.4.1中使用Spark 1.4窗口函数
但得到的大多是错误或意想不到的结果。这里有一个我认为应该工作的非常简单的例子:

from pyspark.sql.window import Window
import pyspark.sql.functions as func

l = [(1,101),(2,202),(3,303),(4,404),(5,505)]
df = sqlContext.createDataFrame(l,["a","b"])

wSpec = Window.orderBy(df.a).rowsBetween(-1,1)

df.select(df.a, func.rank().over(wSpec).alias("rank"))  
    ==> Failure org.apache.spark.sql.AnalysisException: Window function rank does not take a frame specification.

df.select(df.a, func.lag(df.b,1).over(wSpec).alias("prev"), df.b, func.lead(df.b,1).over(wSpec).alias("next"))  
    ===>  org.apache.spark.sql.AnalysisException: Window function lag does not take a frame specification.;

wSpec = Window.orderBy(df.a)

df.select(df.a, func.rank().over(wSpec).alias("rank"))
    ===> org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException: One or more arguments are expected.

df.select(df.a, func.lag(df.b,1).over(wSpec).alias("prev"), df.b, func.lead(df.b,1).over(wSpec).alias("next")).collect()

    [Row(a=1, prev=None, b=101, next=None), Row(a=2, prev=None, b=202, next=None), Row(a=3, prev=None, b=303, next=None)]

正如你所看到的,如果我添加rowsBetween框架规范,rank()lag/lead()窗口函数都不能识别它:“窗口函数不采用帧规范”。
如果我省略了rowsBetween框架规范,至少lag/lead()不会抛出异常,但会返回意外的结果(对我来说):始终为None。和rank()仍然不工作与不同的例外。
谁能帮我把窗口函数做好?

更新

好吧,这看起来像是一只萤火虫。我在纯Spark(Scala,spark-shell)中准备了相同的测试:

import sqlContext.implicits._
import org.apache.spark.sql._
import org.apache.spark.sql.types._

val l: List[Tuple2[Int,Int]] = List((1,101),(2,202),(3,303),(4,404),(5,505))
val rdd = sc.parallelize(l).map(i => Row(i._1,i._2))
val schemaString = "a b"
val schema = StructType(schemaString.split(" ").map(fieldName => StructField(fieldName, IntegerType, true)))
val df = sqlContext.createDataFrame(rdd, schema)

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

val wSpec = Window.orderBy("a").rowsBetween(-1,1)
df.select(df("a"), rank().over(wSpec).alias("rank"))
    ==> org.apache.spark.sql.AnalysisException: Window function rank does not take a frame specification.;

df.select(df("a"), lag(df("b"),1).over(wSpec).alias("prev"), df("b"), lead(df("b"),1).over(wSpec).alias("next"))
    ===> org.apache.spark.sql.AnalysisException: Window function lag does not take a frame specification.;

val wSpec = Window.orderBy("a")
df.select(df("a"), rank().over(wSpec).alias("rank")).collect()
    ====> res10: Array[org.apache.spark.sql.Row] = Array([1,1], [2,2], [3,3], [4,4], [5,5])

df.select(df("a"), lag(df("b"),1).over(wSpec).alias("prev"), df("b"), lead(df("b"),1).over(wSpec).alias("next"))
    ====> res12: Array[org.apache.spark.sql.Row] = Array([1,null,101,202], [2,101,202,303], [3,202,303,404], [4,303,404,505], [5,404,505,null])

尽管rowsBetween不能在Scala中应用,但当省略rowsBetween时,rank()lag()/lead()都能正常工作。

polhcujo

polhcujo1#

据我所知,有两个不同的问题。Hive GenericUDAFRankGenericUDAFLagGenericUDAFLead根本不支持窗口框架定义,因此您看到的错误是预期行为。
关于以下PySpark代码的问题

wSpec = Window.orderBy(df.a)
df.select(df.a, func.rank().over(wSpec).alias("rank"))

这似乎与我的问题https://stackoverflow.com/q/31948194/1560062有关,应该由SPARK-9978解决。到目前为止,你可以通过改变窗口定义来使它工作:

wSpec = Window.partitionBy().orderBy(df.a)
qlckcl4x

qlckcl4x2#

在pyspark中,下面的Window spec将抛出错误。

joined_windowSpec = Window.partitionBy("a_x").orderBy('justDate', 'table_name').rowsBetween(Window.unboundedPreceding, 0)

抛出的错误:

pyspark.errors.exceptions.captured.AnalysisException: Cannot specify window frame for lag function.

而如果我们省略rowBetween规范,它不会抛出错误。

joined_windowSpec = Window.partitionBy("a_x").orderBy('justDate', 'table_name')  ## NO ERROR

相关问题