pyspark 从列中提取用连字符分隔的值并应用UDF

92dk7w1h  于 2022-12-22  发布在  Spark
关注(0)|答案(1)|浏览(96)

我有一个dataframe,如下所示:

+--------+-------+-------+--------------+--------------------+-----------------+----------+--------------------+------------+
|sequence|recType|valCode|registerNumber|                rest|        errorCode|errorType |    errorDescription|isSuccessful|
+--------+-------+-------+--------------+--------------------+-----------------+----------+--------------------+------------+
|       9|     11|      0|      XXXX2288|110XXXX2288MKKKKK...|         CHAR0088|     ERROR|Records out of se...|           N|
|       9|     12|      0|      XXXX2288|130XXXX22880011ZZ...|         CHAR0088|     ERROR|Records out of se...|           N|
|       9|     18|      0|      XXXX2288|140XXXX2288      ...|         CHAR0088|     ERROR|Records out of se...|           N|
+--------+-------+-------+--------------+--------------------+-----------------+----------+--------------------+------------+           N|

以下代码使用UDF填充errorTypeerrorDescription列的数据。UDFs(即resolveErrorTypeUDFresolveErrorDescUDF)将一个errorCode作为输入,并在输出中分别提供相应的errorTypeerrorDescription

errorFinalDf = errorDfAll.na.fill("") \
 .withColumn("errorType", resolveErrorTypeUDF(col("errorCode"))) \
 .withColumn("errorDescription", resolveErrorDescUDF(col("errorCode"))) \
 .withColumn("isSuccessful", when(trim(col("errorCode")).eqNullSafe(""), "Y").otherwise("N")) \
 .dropDuplicates()

请注意,我过去在errorCode列中只得到一个error code。现在,我将在errorCode列中得到单个/多个-分隔的error codes。我需要填充所有MaperrorTypeerrorDescription,并将它们写入相应的列中,分隔为-
新的dataframe看起来像这样。

+--------+-------+-------+--------------+--------------------+-----------------+----------+--------------------+------------+
|sequence|recType|valCode|registerNumber|                rest|        errorCode|errorType |    errorDescription|isSuccessful|
+--------+-------+-------+--------------+--------------------+-----------------+----------+--------------------+------------+
|       7|      1|      0|      XXXX8822|010XXXX8822XBCDEF...|CHAR0009-CHAR0021|ERROR-WARN|Short Failed-Miss...|           N|
|       7|     11|      0|      XXXX8822|110XXXX8822LLLLLL...|CHAR0009-CHAR0021|ERROR-WARN|Short Failed-Miss...|           N|
|       7|     12|      0|      XXXX8822|120XXXX8822011GB ...|CHAR0009-CHAR0021|ERROR-WARN|Short Failed-Miss...|           N|
|       7|     18|      0|      XXXX8822|180XXXX8822      ...|CHAR0009-CHAR0021|ERROR-WARN|Short Failed-Miss...|           N|
|       7|     18|      0|      XXXX8822|180XXXX88220     ...|CHAR0009-CHAR0021|ERROR-WARN|Short Failed-Miss...|           N|
+--------+-------+-------+--------------+--------------------+-----------------+----------+--------------------+------------+

为了适应新的情况需要做哪些改变。请帮助。谢谢。

9rygscc1

9rygscc11#

您只需要对UDFs进行最小限度的更改。
假设你有一个简单的python函数,get_type_from_code能够将一个带有错误代码的字符串转换成相应的类型(同样的方法也适用于描述)。

from pyspark.sql import functions as F, types as T

def get_type_from_code(c: str) -> str:
    """Function to convert error code to error type.

    Mind the interface: string in, string out
    """
    return {'CHAR0009': 'ERROR', 'CHAR0021': 'WARNING'}.get(c, 'UNKNOWN')

@F.udf(returnType=T.StringType())
def convert_errcodes_to_types(codes: str) -> str:
    """Convert a string of error codes separated by '-' in a correspondent string of types concatenated with '-'"""
    return '-'.join(
        map(get_type_from_code, codes.split('-'))
    )

成交!

相关问题