问:有没有一种方法可以只将Column_names传递给spark df,并期望spark来推断模式类型?
我的场景:我正在尝试使用Kubernetes启动一个spark作业,它基本上从AWS S3读取CSV文件并使用spark.read.csv()
创建一个spark df。
如果CSV文件没有头,我需要将schema手动传递给spark Dataframe ,这可以通过以下方法实现。
schema = StructType([
StructField('column_name', StringType(), True),
StructField('column_name1', StringType(), True)
])
df = spark.read.csv( csv_file, header = False, schema = schema )
这都很好。
但是
问题:我正在将所需参数(如S3_access_key、secret_key、column_names ...等)作为环境变量传递到执行器pod。请参考以下代码片段。
ArgoDriverV2.ArgoDriver.create_spark_job(
's3-connector',
'WriteS3',
namespace='default',
executors=2,
args={
"USER":self.user.id,
"COLUMN_SCHEMA": json.dumps(column_names),
"S3_FILE_KEYS":json.dumps(s3_file_keys),
"S3_ACCESS_KEY": params['access_key'],
"S3_SECRET_KEY": params['secret_key'],
"N_EXECUTORS":2,
})
使用column_names,我可以在spark job中生成模式并将其传递给 Dataframe ,但是我发现这种方法有点复杂。
有没有办法只将Column_names传递给spark df,并期望spark推断模式类型?
1条答案
按热度按时间gmxoilav1#
您可以使用
inferSchema=true
读取csv,然后简单地如下重命名列: