pyspark中的udf

gab6jxml  于 2021-07-14  发布在  Spark
关注(0)|答案(1)|浏览(518)

我必须在pyspark的滑动窗口中执行聚合。特别是,我必须执行以下操作:
一次考虑100天的数据
按给定的id列分组
取聚合的最后一个值
求和并返回结果
这些任务必须在具有 .rangeBetween(-100 days, 0) 我可以通过构造一个pandas udf轻松实现这个结果,该udf将pyspark df的一些列作为输入,将它们转换为pandasDataframe,然后计算聚合并返回标量结果。然后将自定义项应用于所需的滑动窗口。
尽管这个解决方案工作得很好,但由于dfs包含数百万行,完成任务需要花费大量时间(3-4小时)。有没有办法提高这种运算的计算时间?我和pyspark一起在databricks工作。
我的自定义项是:

@pandas_udf(FloatType(), PandasUDFType.GROUPED_AGG)
def method2(analyst: pd.Series, revisions: pd.Series) -> float:
  df = pd.DataFrame({
    'analyst': analyst,
    'revisions': revisions
  })
  return df.groupby('analyst').last()['revisions'].sum() / df.groupby('analyst').last()['revisions'].abs().sum()

适用于:

days = lambda x: x*60*60*24
w = Window.partitionBy('csecid').orderBy(F.col('date').cast('timestamp').cast('long')).rangeBetween(-days(100), 0)
df = df.withColumn('new_col', method2(F.col('analystid'), F.col('revisions_improved')).over(w))

编辑:我知道这种聚合可以通过使用numpy数组来实现,而pyspark udf使用numpy结构要快得多。但是,我希望避免使用这种解决方案,因为我需要在相同的框架中应用函数,这些函数比显示的函数复杂得多,而且很难用numpy复制。

4dbbbstv

4dbbbstv1#

我最近不得不实现一个类似的聚合,我的第一个尝试是使用带有滑动窗口的pandas udf。性能非常差,我通过使用以下方法来改进它。
尝试使用 collect_list 合成滑动窗口向量,然后用自定义项Map它们。请注意,只有当滑动窗口可以放入workers内存时(通常是这样)。
这是我的测试代码。第一部分只是你的代码,但作为一个完整的可复制的例子。

import pandas as pd
import pyspark.sql.functions as F
from pyspark.sql import Window
from pyspark.sql.functions import pandas_udf, PandasUDFType, udf
from pyspark.sql.types import FloatType, StructType, StructField, IntegerType, StringType

df = spark.createDataFrame(
  [(1, "2021-04-01", 10, -30),
   (1, "2021-03-01", 10, 20),
   (1, "2021-02-01", 10, -1),
   (1, "2021-01-01", 10, 10),
   (1, "2020-12-01", 10, 5),
   (1, "2021-04-01", 20, -5),
   (1, "2021-03-01", 20, -4),
   (1, "2021-02-01", 20, -3),
   (2, "2021-03-01", 10, 5),
   (2, "2021-02-01", 10, 6),
  ], 
  StructType([
    StructField("csecid", StringType(), True), 
    StructField("date", StringType(), True), 
    StructField("analystid", IntegerType(), True), 
    StructField("revisions_improved", IntegerType(), True)
  ]))

### Baseline

@pandas_udf(FloatType(), PandasUDFType.GROUPED_AGG)
def method2(analyst: pd.Series, revisions: pd.Series) -> float:
  df = pd.DataFrame({
    'analyst': analyst,
    'revisions': revisions
  })
  return df.groupby('analyst').last()['revisions'].sum() / df.groupby('analyst').last()['revisions'].abs().sum()

days = lambda x: x*60*60*24
w = Window.partitionBy('csecid').orderBy(F.col('date').cast('timestamp').cast('long')).rangeBetween(-days(100), 0)

# df.withColumn('new_col', method2(F.col('analystid'), F.col('revisions_improved')).over(w))

建议替代方案:


### Method 3

from typing import List

@udf(FloatType())
def method3(analyst: List[int], revisions: List[int]) -> float:
  df = pd.DataFrame({
    'analyst': analyst,
    'revisions': revisions
  })
  return float(df.groupby('analyst').last()['revisions'].sum() / df.groupby('analyst').last()['revisions'].abs().sum())

(df
.withColumn('new_col', method2(F.col('analystid'), F.col('revisions_improved')).over(w))

.withColumn('analyst_win', F.collect_list("analystid").over(w))
.withColumn('revisions_win', F.collect_list("revisions_improved").over(w))

.withColumn('method3', method3(F.collect_list("analystid").over(w), 
                               F.collect_list("revisions_improved").over(w)))
.orderBy("csecid", "date", "analystid")
.show(truncate=False))

结果:

+------+----------+---------+------------------+---------+----------------------------+-----------------------------+---------+
|csecid|date      |analystid|revisions_improved|new_col  |analyst_win                 |revisions_win                |method3  |
+------+----------+---------+------------------+---------+----------------------------+-----------------------------+---------+
|1     |2020-12-01|10       |5                 |1.0      |[10]                        |[5]                          |1.0      |
|1     |2021-01-01|10       |10                |1.0      |[10, 10]                    |[5, 10]                      |1.0      |
|1     |2021-02-01|10       |-1                |-1.0     |[10, 10, 10, 20]            |[5, 10, -1, -3]              |-1.0     |
|1     |2021-02-01|20       |-3                |-1.0     |[10, 10, 10, 20]            |[5, 10, -1, -3]              |-1.0     |
|1     |2021-03-01|10       |20                |0.6666667|[10, 10, 10, 20, 10, 20]    |[5, 10, -1, -3, 20, -4]      |0.6666667|
|1     |2021-03-01|20       |-4                |0.6666667|[10, 10, 10, 20, 10, 20]    |[5, 10, -1, -3, 20, -4]      |0.6666667|
|1     |2021-04-01|10       |-30               |-1.0     |[10, 10, 20, 10, 20, 10, 20]|[10, -1, -3, 20, -4, -30, -5]|-1.0     |
|1     |2021-04-01|20       |-5                |-1.0     |[10, 10, 20, 10, 20, 10, 20]|[10, -1, -3, 20, -4, -30, -5]|-1.0     |
|2     |2021-02-01|10       |6                 |1.0      |[10]                        |[6]                          |1.0      |
|2     |2021-03-01|10       |5                 |1.0      |[10, 10]                    |[6, 5]                       |1.0      |
+------+----------+---------+------------------+---------+----------------------------+-----------------------------+---------+
``` `analyst_win` 以及 `revisions_win` 只是展示了如何创建滑动窗口并将其传递到udf中。它们应该在生产中去除。
将groupby移到udf之外可能会提高性能。spark可以处理这一步。不过,我没有质疑这一部分,因为你提到的功能并不代表实际任务。
查看sparkui中的性能,特别是应用udf的任务的时间统计信息。如果时间很长,请尝试使用 `repartition` 这样每个任务都会产生一个较小的数据子集。

相关问题