PYSPARK如何使用dict创建UDF,然后使用UDF向 Dataframe 添加列

inn6fuwd  于 2022-11-21  发布在  Spark
关注(0)|答案(2)|浏览(142)

我 需要 在 pyspark 中 创建 一 个 UDF 来 转换 字母 等级( " A " 、 " B " 、 " C " 、 " D " 、 " F " ) 到 数字 等级( 4 、 3 、 2 、 1 和 0 ) 。 然后 我 需要 将 此 函数 注册 为 spark UDF 。 接下来 ,我 有 一 个 数据 帧 " current _ gpa " 。 Current _ gpa 有 一 个 名 为 " grade " 的 列 。 我 需要 向 数据 帧 current _ gpa 添加 一 个 名 为 " num _ grade " 的 列其中 列 " grade " 中 的 字母 等级 被 转换 为 列 " num _ grade " 中 的 相应 数字 。
这 是 我 创建 的 UDF :

def get_num(letter):
    letter_class_dict = {"A": 1, "B": 2, "C": 3, "D": 4, "F": 5}
    for letter, l in letter_class_dict():
        x['letter'] = l
 
    return l

get_num =  udf(lambda letter: letter_class_dict.get(letter))
get_num_udf = F.udf(get_num, IntegerType())

中 的 每 一 个
这 是 数据 帧 current _ gpa :

+-------+-------+------+----+-----+-------+
| course|term_id|   sid| fid|grade|credits|
+-------+-------+------+----+-----+-------+
|BIO 101|  2000B|100001|1007|    F|      3|
|BIO 102|  2000B|100001|1007|    F|      4|
|CHM 101|  2000B|100001|1002|    F|      4|
|BIO 103|  2000B|100001|1007|    F|      4|
|GEN 114|  2000B|100001|1006|    F|      3|
+-------+-------+------+----+-----+-------+

格式
我 尝试 使用 此 UDF 添加 一 个 列 ' num _ grade ' , 其中 的 值 应 如下 所 示 :
第 一 次
这 会 产生 错误 :UDF 引发 了 异常 错误 :' 执行 阶段 错误 :SparkContext 只能 在 驱动 程序 上 创建 和 访问 。 " 。 完整 追溯 如下 :

ivqmmu1c

ivqmmu1c1#

您不需要UDF来执行此操作,并且您应该始终尽量避免UDF(除非绝对必要),因为Spark无法优化它们,这可能会导致性能下降。
这是一个简单的情况,当(when().otherwise())操作可以使用列表解析中的字典项或python的本地map函数来构建时。

letter_class_dict = {"A": 4, "B": 3, "C": 2, "D": 1, "F": 0}

# create individual case when statement for each swap
letter_class_casewhens = map(lambda a: func.when(func.col('grade') == a[0], func.lit(a[1])), 
                             letter_class_dict.items()
                             )

# [Column<'CASE WHEN (grade = A) THEN 4 END'>,
#  Column<'CASE WHEN (grade = B) THEN 3 END'>,
#  Column<'CASE WHEN (grade = C) THEN 2 END'>,
#  Column<'CASE WHEN (grade = D) THEN 1 END'>,
#  Column<'CASE WHEN (grade = F) THEN 0 END'>]

# pass the case when statements in a `coalesce` function
data_sdf. \
    withColumn('num_grades', func.coalesce(*letter_class_casewhens)). \
    show()

# +-------+-------+------+----+-----+-------+----------+
# | course|term_id|   sid| fid|grade|credits|num_grades|
# +-------+-------+------+----+-----+-------+----------+
# |BIO 101|  2000B|100001|1007|    F|      3|         0|
# |BIO 102|  2000B|100001|1007|    F|      4|         0|
# |CHM 101|  2000B|100001|1002|    F|      4|         0|
# |BIO 103|  2000B|100001|1007|    F|      4|         0|
# |GEN 114|  2000B|100001|1006|    F|      3|         0|
# +-------+-------+------+----+-----+-------+----------+
5gfr0r5j

5gfr0r5j2#

下面是我创建UDF将字母等级转换为数字的过程:

def convert_grades(letter):
    letter_grades = {
    'A':4,
    'B': 3,
    'C':2,
    'D':1,
    'F':0
  }
    return letter_grades.get(letter)
 
grade_points = spark.udf.register('convert_grades', convert_grades)

相关问题