Pyspark中的AutoCast数据类型

q3aa0525  于 2023-10-15  发布在  Spark
关注(0)|答案(1)|浏览(92)

我正在从PostgreSQL创建一个parquet文件,它将所有内容标记为varchar列。一旦我们在ADLS中有了文件,我们希望使用Python/Pyspark根据Azure数据块中的日期,整数,varchar字段等数据转换数据类型。相同的代码将被具有不同模式的多个文件使用,因此需要一个通用过程。所以想要一个方法来自动铸造字段

mnemlml8

mnemlml81#

我已经尝试使用pyspark复制和Auto Cast方法来CAST所有的Varchar数据库,根据数据,如Date,Date,Varchar字段。
例如,我使用字符串数据类型创建了数据和列,并将它们在ADLS中设置为Parquet格式。

schema = StructType([
StructField("name", StringType(), True),
StructField("birth_date", StringType(), True),
StructField("age", StringType(), True),
StructField("score", StringType(), True)
])
data = [("John", "1990-01-01", "123", "12.34"),
("Alice", "1995-05-15", "456", "56.78")]

下面的代码将对数据类型Date、Date、Varchar进行数据流转换

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date, to_timestamp, when
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType, DoubleType
spark = SparkSession.builder.appName("AutoCastParquet").getOrCreate()
Parquet_file_path = "abfss://[email protected]/parquet_sample.parquet"
parquet_df = spark.read.parquet(Parquet_file_path)
casting_functions = {
    "string": lambda col_name: col(col_name),  
    "date": to_date,
    "timestamp": to_timestamp,
    "integer": lambda col_name: col(col_name).cast("integer"),
    "double": lambda col_name: col(col_name).cast("double"),
}
for column_name, data_type in parquet_df.dtypes:
    if "string" in data_type.lower():
        cast_func = casting_functions.get(data_type.lower())
        if cast_func:
            parquet_df = parquet_df.withColumn(column_name, cast_func(column_name))
parquet_df = parquet_df.withColumn(
    "birth_date",
    when(
        to_date(col("birth_date"), "yyyy-MM-dd").isNotNull(),
        to_date(col("birth_date"), "yyyy-MM-dd")
    ).otherwise(None)
)
parquet_df = parquet_df.withColumn(
    "age",
    when(
        col("age").cast("integer").isNotNull(),
        col("age").cast("integer")
    ).otherwise(None)
)
parquet_df = parquet_df.withColumn(
    "score",
    when(
        col("score").cast("double").isNotNull(),
        col("score").cast("double")
    ).otherwise(None)
)
new_schema = StructType([
    StructField("name", StringType(), True),
    StructField("birth_date", DateType(), True),
    StructField("age", IntegerType(), True),
    StructField("score", DoubleType(), True)
])
parquet_df = spark.createDataFrame(parquet_df.rdd, new_schema)
parquet_df.printSchema()
display(parquet_df)

上面的代码从Azure Data Lake Storage读取Parquet文件到Spark DataFrame中,自动识别StringType列,并使用预定义的转换函数将它们转换为推断的数据类型。结果DataFrame,具有更正的数据类型。

相关问题