pandas 使用PySpark绑定数值列

idv4meu8  于 2023-02-17  发布在  Spark
关注(0)|答案(2)|浏览(134)

我有一个PySpark数据框df,其中有一个数值列(带有NaNs)

+-------+
|numbers|
+-------+
| 142.56|
|       |
|2023.33|
| 477.76|
| 175.52|
|1737.45|
| 520.72|
|  641.2|
|   79.3|
| 138.43|
+-------+

我想创建一个新列来定义一些bin,例如0, (0, 500], (500, 1000], (1000, inf)
有没有一种方法可以使用pandas.cut这样的函数来完成这个任务?目前,我使用PySpark来完成这个任务的方法是定义一个udf函数,如下所示,但是这种方法的缺点是繁琐和非参数化

from pyspark.sql import functions as F
from pyspark.sql.types import *

def func(numbers):
    if numbers==0:
        return '0'
    elif numbers>0 and numbers<=500:
        return '(0, 500]'
    elif numbers>500 and numbers<=1000:
        return '(500, 1000]'
    elif numbers>500:
        return '(500, inf)'
    else return 'Other'

func_udf = F.udf(func, StringType())

df.withColumn('numbers_bin', func_udf(df['numbers']))

如果df是一个Pandas DataFrame,我会使用以下方法:

df['numbers_bin'] = pd.cut(
    df['numbers'],
    np.concatenate((-np.inf, [0, 500, 1000], np.inf), axis=None))

更干净更模块化

dauxcl2d

dauxcl2d1#

您可以使用Spark ML的Bucketizer

from pyspark.ml.feature import Bucketizer

df2 = Bucketizer(
    splits=[-float('inf'), 0, 500, 1000, float('inf')],
    inputCol='numbers',
    outputCol='numbers_bin'
).transform(df)

df2.show()
+-------+-----------+
|numbers|numbers_bin|
+-------+-----------+
| 142.56|        1.0|
|   null|       null|
|2023.33|        3.0|
| 477.76|        1.0|
| 175.52|        1.0|
|1737.45|        3.0|
| 520.72|        2.0|
|  641.2|        2.0|
|   79.3|        1.0|
| 138.43|        1.0|
+-------+-----------+

如果要改为显示间隔:

import pyspark.sql.functions as F

df2 = Bucketizer(
    splits=[-float('inf'), 0, 500, 1000, float('inf')],
    inputCol='numbers', 
    outputCol='numbers_bin'
).transform(df).withColumn(
    'numbers_bin',
    F.expr("""
        format_string(
            '%s, %s',
            array(-float('inf'), 0, 500, 1000, float('inf'))[int(numbers_bin) - 1],
            array(-float('inf'), 0, 500, 1000, float('inf'))[int(numbers_bin)])
    """)
)

df2.show()
+-------+--------------+
|numbers|   numbers_bin|
+-------+--------------+
| 142.56|-Infinity, 0.0|
|   null|    null, null|
|2023.33| 500.0, 1000.0|
| 477.76|-Infinity, 0.0|
| 175.52|-Infinity, 0.0|
|1737.45| 500.0, 1000.0|
| 520.72|    0.0, 500.0|
|  641.2|    0.0, 500.0|
|   79.3|-Infinity, 0.0|
| 138.43|-Infinity, 0.0|
+-------+--------------+
6yt4nkrj

6yt4nkrj2#

更进一步

from pyspark.ml.feature import Bucketizer
from pyspark.sql.functions import udf, broadcast, col, count

BINS = [-float('inf'), 0, 500, 1000, float('inf')]

df_final = Bucketizer(
                splits=BINS,
                inputCol="numbers",
                outputCol="bin_number"
    ).transform(df)

df_final.show()
+-------+----------+
|numbers|bin_number|
+-------+----------+
| 142.56|       1.0|
|   null|      null|
|2023.33|       3.0|
| 477.76|       1.0|
| 175.52|       1.0|
|1737.45|       3.0|
| 520.72|       2.0|
|  641.2|       2.0|
|   79.3|       1.0|
| 138.43|       1.0|
+-------+----------+

现在,让我们添加每行的间隔

intervals = []
for i in range(0, len(BINS)-1):
    intervals.append(f"({BINS[i]}, {BINS[i+1]}]")
print(intervals)
['(-inf, 0]', '(0, 500]', '(500, 1000]', '(1000, inf]']

我使用广播来确保将列表发送到群集中的所有节点

mapping = spark.sparkContext.broadcast(intervals)

def get_bins(values):
    def f(x):
        if x is None:
            return values[int(0)]
        else:
            return values[int(x)]
    return udf(f)

df_final = df_final.withColumn("interval", get_bins(mapping.value)(col("bin_number")))
df_final.show()
+-------+----------+-----------+
|numbers|bin_number|   interval|
+-------+----------+-----------+
| 142.56|       1.0|   (0, 500]|
|   null|      null|  (-inf, 0]|
|2023.33|       3.0|(1000, inf]|
| 477.76|       1.0|   (0, 500]|
| 175.52|       1.0|   (0, 500]|
|1737.45|       3.0|(1000, inf]|
| 520.72|       2.0|(500, 1000]|
|  641.2|       2.0|(500, 1000]|
|   79.3|       1.0|   (0, 500]|
| 138.43|       1.0|   (0, 500]|
+-------+----------+-----------+

最后,我们可以计算每个区间

df_final = df_final.groupBy("interval").agg(count("interval").alias("count")).orderBy(col("count").asc())
df_final.show()
+-----------+-----+
|   interval|count|
+-----------+-----+
|  (-inf, 0]|    1|
|(1000, inf]|    2|
|(500, 1000]|    2|
|   (0, 500]|    5|
+-----------+-----+

相关问题