我正在从PostgreSQL创建一个parquet文件,它将所有内容标记为varchar列。一旦我们在ADLS中有了文件,我们希望使用Python/Pyspark根据Azure数据块中的日期,整数,varchar字段等数据转换数据类型。相同的代码将被具有不同模式的多个文件使用,因此需要一个通用过程。所以想要一个方法来自动铸造字段
mnemlml81#
我已经尝试使用pyspark复制和Auto Cast方法来CAST所有的Varchar数据库,根据数据,如Date,Date,Varchar字段。例如,我使用字符串数据类型创建了数据和列,并将它们在ADLS中设置为Parquet格式。
schema = StructType([ StructField("name", StringType(), True), StructField("birth_date", StringType(), True), StructField("age", StringType(), True), StructField("score", StringType(), True) ]) data = [("John", "1990-01-01", "123", "12.34"), ("Alice", "1995-05-15", "456", "56.78")]
下面的代码将对数据类型Date、Date、Varchar进行数据流转换
from pyspark.sql import SparkSession from pyspark.sql.functions import col, to_date, to_timestamp, when from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType, DoubleType spark = SparkSession.builder.appName("AutoCastParquet").getOrCreate() Parquet_file_path = "abfss://[email protected]/parquet_sample.parquet" parquet_df = spark.read.parquet(Parquet_file_path) casting_functions = { "string": lambda col_name: col(col_name), "date": to_date, "timestamp": to_timestamp, "integer": lambda col_name: col(col_name).cast("integer"), "double": lambda col_name: col(col_name).cast("double"), } for column_name, data_type in parquet_df.dtypes: if "string" in data_type.lower(): cast_func = casting_functions.get(data_type.lower()) if cast_func: parquet_df = parquet_df.withColumn(column_name, cast_func(column_name)) parquet_df = parquet_df.withColumn( "birth_date", when( to_date(col("birth_date"), "yyyy-MM-dd").isNotNull(), to_date(col("birth_date"), "yyyy-MM-dd") ).otherwise(None) ) parquet_df = parquet_df.withColumn( "age", when( col("age").cast("integer").isNotNull(), col("age").cast("integer") ).otherwise(None) ) parquet_df = parquet_df.withColumn( "score", when( col("score").cast("double").isNotNull(), col("score").cast("double") ).otherwise(None) ) new_schema = StructType([ StructField("name", StringType(), True), StructField("birth_date", DateType(), True), StructField("age", IntegerType(), True), StructField("score", DoubleType(), True) ]) parquet_df = spark.createDataFrame(parquet_df.rdd, new_schema) parquet_df.printSchema() display(parquet_df)
上面的代码从Azure Data Lake Storage读取Parquet文件到Spark DataFrame中,自动识别StringType列,并使用预定义的转换函数将它们转换为推断的数据类型。结果DataFrame,具有更正的数据类型。
1条答案
按热度按时间mnemlml81#
我已经尝试使用pyspark复制和Auto Cast方法来CAST所有的Varchar数据库,根据数据,如Date,Date,Varchar字段。
例如,我使用字符串数据类型创建了数据和列,并将它们在ADLS中设置为Parquet格式。
下面的代码将对数据类型Date、Date、Varchar进行数据流转换
上面的代码从Azure Data Lake Storage读取Parquet文件到Spark DataFrame中,自动识别StringType列,并使用预定义的转换函数将它们转换为推断的数据类型。结果DataFrame,具有更正的数据类型。