取消嵌套后,Pyspark列错误地转换为字符串

nbnkbykc  于 2023-08-03  发布在  Spark
关注(0)|答案(1)|浏览(95)

我们有一个Pyspark数据集,其中每个列都有一个带参数的嵌套结构:string_value和int_value。对于每一列,只有一个值被填充(另一个是null),对于同一个参数,它总是相同的值。然而,我们不知道哪一列有字符串值,哪一列有整数值(这个例子是一个简化)。
我们尝试将列的值提取为正确的类型(请参见下面的代码),但它一直作为字符串列返回。

from pyspark.sql.types import DoubleType, LongType, StringType, StructType, StructField
from pyspark.sql import Row
import pyspark.sql.functions as F

schema = StructType([
   StructField("id", StringType(), True),
   StructField("string_col", StructType([
       StructField("string_value", StringType(), True),
       StructField("int_value", LongType(), True)
   ]), True),
   StructField("integer_col", StructType([
       StructField("string_value", StringType(), True),
       StructField("int_value", LongType(), True)
   ]), True)
   ])

df = spark.createDataFrame(
    [
        Row(id='01', string_col=Row(string_value='A', int_value=None), integer_col=Row(string_value=None, int_value=65)),
        Row(id='02', string_col=Row(string_value='B', int_value=None), integer_col=Row(string_value=None, int_value=101)),
        Row(id='03', string_col=Row(string_value='C', int_value=None), integer_col=Row(string_value=None, int_value=384))
    ],
    schema)

df.printSchema()

字符串
返回的模式:

root
 |-- id: string (nullable = true)
 |-- string_col: struct (nullable = true)
 |    |-- string_value: string (nullable = true)
 |    |-- int_value: long (nullable = true)
 |-- integer_col: struct (nullable = true)
 |    |-- string_value: string (nullable = true)
 |    |-- int_value: long (nullable = true)


尝试提取列:

for expanded_col in ['string_col', 'integer_col']:
    df = (df.withColumn(expanded_col, 
                       F.when(F.col(expanded_col + '.int_value').isNotNull(), F.col(expanded_col + '.int_value').cast(IntegerType()))
                        .otherwise(F.col(expanded_col + '.string_value'))))
df.printSchema()


返回的模式:

root
 |-- id: string (nullable = true)
 |-- string_col: string (nullable = true)
 |-- integer_col: string (nullable = true)

bpsygsoo

bpsygsoo1#

让我们修复并简化您的代码

cmap = {'string_col': 'string_value', 'integer_col': 'int_value'}
for k, v in cmap.items():
    df = df.withColumn(k, F.col(k)[v])

字符串
对于pyspark版本>=3.3.0,您可以使用一个liner

df = df.withColumns({k: F.col(k)[v] for k, v in cmap.items()})
df.show()
+---+----------+-----------+
| id|string_col|integer_col|
+---+----------+-----------+
| 01|         A|         65|
| 02|         B|        101|
| 03|         C|        384|
+---+----------+-----------+

df.printSchema()
root
 |-- id: string (nullable = true)
 |-- string_col: string (nullable = true)
 |-- integer_col: long (nullable = true)

的数据

相关问题