sparkDataframe数组与python列表不同吗?

wkyowqbh  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(440)

如果我有Spark DataFrame 包含 arrays ,我可以通过自定义项在这些数组上使用python列表方法吗?我怎样才能点燃Spark
DataFrame array<double> 把它变成python列表?
下面是一个例子,有几个自定义项。我不知道为什么取最大值有效,但是 len 没有。最后,我想用原始数组列的采样值创建一个新列。这也得到了一个错误,期待两个参数,加分,如果你可以帮助!
我有以下Spark DataFrame :

from pyspark.sql.functions import udf
from pyspark.sql import Row
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType
from pyspark.sql.types import ArrayType
import random

df = sc.parallelize([Row(name='Joe',scores=[1.0,2.0,3.0]),
Row(name='Mary', scores=[3.0]),
Row(name='Mary', scores=[4.0,7.1])]).toDF()
>>> df.show()
+----+---------------+
|name|         scores|
+----+---------------+
| Joe|[1.0, 2.0, 3.0]|
|Mary|          [3.0]|
|Mary|     [4.0, 7.1]|
+----+---------------+
>>> df
DataFrame[name: string, scores: array<double>]
def sampleWithReplacement(listIn,samples):
    tempList = array()
    count=0
    while (count<samples):
        tempList.append(random.sample(listIn,1)[0])
        count=count+1
    return tempList

def maxArray(listIn):
    return max(listIn)

def lenArray(listIn):
    return len(listIn)
sampUDF=udf(sampleWithReplacement,ArrayType())
maxUDF=udf(maxArray,IntegerType())
lenUDF=udf(lenArray,IntegerType())

>>> df.withColumn("maxCol",maxUDF(df.scores)).show()
+----+---------------+------+
|name|         scores|maxCol|
+----+---------------+------+
| Joe|[1.0, 2.0, 3.0]|  null|
|Mary|          [3.0]|  null|
|Mary|     [4.0, 7.1]|  null|
+----+---------------+------+

>>> df.withColumn("maxCol",lenUDF(df.scores)).show()
+----+---------------+------+
|name|         scores|maxCol|
+----+---------------+------+
| Joe|[1.0, 2.0, 3.0]|     3|
|Mary|          [3.0]|     1|
|Mary|     [4.0, 7.1]|     2|
+----+---------------+------+
p8ekf7hl

p8ekf7hl1#

热释光;当你有选择的时候,总是喜欢内置函数而不是 udf . 计算长度使用 size (化名为 length) 方法:

from pyspark.sql.functions import length, size 

df.withColumn("len", size("scores"))

对于小型阵列,可以尝试

from pyspark.sql.functions import sort_array

df.withColumn("max", sort_array("scores", False)[0])

当然,对于大型藏品来说,这不是一个好的选择。
sparkDataframe数组与python列表不同吗?
在内部它们是不同的,因为有scala对象。在中访问时 udf 有简单的python列表。那么到底出了什么问题?
让我们看看这些类型。 scores 列为 array<double> . 当转换为python类型时,这将导致 List[float] . 当你打电话的时候 max 你得到一个 float 在输出上。
但是您将返回类型声明为 IntegerType . 因为 float 无法转换为整数精度损失结果未定义 NULL . 返回类型的正确选择是 DoubleType 或者 FloatType :

maxf = udf(lambda xs: max(xs), FloatType())
maxd = udf(lambda xs: max(xs), DoubleType())

(sc
    .parallelize([("Joe", [1.0, 2.0, 3.0])])
    .toDF(["name", "scores"])
    .select("*", maxf("scores"), maxd("scores")))

结果如下:

+----+---------------+----------------+----------------+
|name|         scores|<lambda>(scores)|<lambda>(scores)|
+----+---------------+----------------+----------------+
| Joe|[1.0, 2.0, 3.0]|             3.0|             3.0|
+----+---------------+----------------+----------------+

和架构:

root
 |-- name: string (nullable = true)
 |-- scores: array (nullable = true)
 |    |-- element: double (containsNull = true)
 |-- <lambda>(scores): float (nullable = true)
 |-- <lambda>(scores): double (nullable = true)

相关问题