手动向Spark Dataframe 传递模式

tzcvj98z  于 2023-02-19  发布在  Apache
关注(0)|答案(1)|浏览(144)

问:有没有一种方法可以只将Column_names传递给spark df,并期望spark来推断模式类型?
我的场景:我正在尝试使用Kubernetes启动一个spark作业,它基本上从AWS S3读取CSV文件并使用spark.read.csv()创建一个spark df。
如果CSV文件没有头,我需要将schema手动传递给spark Dataframe ,这可以通过以下方法实现。

schema = StructType([
         StructField('column_name', StringType(), True),
         StructField('column_name1', StringType(), True)
       ])
df = spark.read.csv( csv_file, header = False, schema = schema )

这都很好。
但是
问题:我正在将所需参数(如S3_access_key、secret_key、column_names ...等)作为环境变量传递到执行器pod。请参考以下代码片段。

ArgoDriverV2.ArgoDriver.create_spark_job(
        's3-connector', 
        'WriteS3', 
        namespace='default', 
        executors=2,
        args={
            "USER":self.user.id,
            "COLUMN_SCHEMA": json.dumps(column_names),
            "S3_FILE_KEYS":json.dumps(s3_file_keys),
            "S3_ACCESS_KEY": params['access_key'],
            "S3_SECRET_KEY": params['secret_key'],
            "N_EXECUTORS":2,
    })

使用column_names,我可以在spark job中生成模式并将其传递给 Dataframe ,但是我发现这种方法有点复杂。

有没有办法只将Column_names传递给spark df,并期望spark推断模式类型?

gmxoilav

gmxoilav1#

您可以使用inferSchema=true读取csv,然后简单地如下重命名列:

# let's say that we have a list of desired column names
cols = ['a', 'b', 'c']

df = spark.read.option("inferSchema", True).csv("test")
df = df.select([df[x].alias(y) for x,y in zip(df.columns, cols)])

相关问题