我试图合并5从csv的dataframe和创建新的dataframe比在第二步中,我创建空表与自定义模式,现在我想从dataframe加载记录。
这里是一步一步的细节。
1.从所有5个 Dataframe 创建 Dataframe
cr_df = spark.read.format("csv").option("header", "true").load("abfss://abcxxxxxxxxxxxx.dfs.core.windows.net/Position1.csv")
ir_df = spark.read.format("csv").option("header", "true").load("abfss://abc2xxxxxxxxxxxx.dfs.core.windows.net/Position2.csv")
fx_df = spark.read.format("csv").option("header", "true").load("abfss://abc3xxxxxxxxxxxx.dfs.core.windows.net/Position3.csv")
eq_df = spark.read.format("csv").option("header", "true").load("abfss://abc4xxxxxxxxxxxx.dfs.core.windows.net/Position4.csv")
co_df = spark.read.format("csv").option("header", "true").load("abfss://derdi@abc5xxxxxxxxxxxx.dfs.core.windows.net/Position5.csv")
1.合并以上 Dataframe :
merged_df = cr_df.unionByName(ir_df, allowMissingColumns=True) \
.unionByName(fx_df, allowMissingColumns=True) \
.unionByName(eq_df, allowMissingColumns=True) \
.unionByName(co_df, allowMissingColumns=True)
1.使用自定义架构创建空表:
CREATE TABLE staging.ddr_position_test
(
ReportDate DATE ,
JurisdictionId INTEGER ,
TransactionId VARCHAR(256) ,
ReportAssetClass VARCHAR(30) ,
ReportTradeSequence DECIMAL(4) ,
LoadId INTEGER,
DataSourceId VARCHAR(4),
Cleared VARCHAR(50)
) USING DELTA
PARTITIONED BY (ReportDate, ReportAssetClass)
LOCATION 'abfss://slv-container@xyzxxxxxxxxxxxxx.dfs.core.windows.net//silver//delta/ddr_position_test/'
第4步:我得到shcema不匹配错误,因此我创建了列数据类型为这个合并dataframe,请注意,我不能使用模式重写或合并选项,因为我有某些类型的列数据类型的要求,也有一些列需要重命名.
df = decimal_to_string_Cols_df.withColumn("ReportDate", lit("ReportDate").cast(DateType())) \
.withColumn("JurisdictionId", lit("JurisdictionId").cast(IntegerType())) \
.withColumn("ReportAssetClass", lit("ReportAssetClass").cast(StringType())) \
.withColumn("ReportTradeSequence", lit("ReportTradeSequence").cast(DecimalType(4))) \
.withColumn("LoadId", lit("LoadId").cast(IntegerType())) \
.withColumn("CreatedTimestamp", lit("CreatedTimestamp").cast(TimestampType()))
作为最后一步(第5步),我正在编写并保存以下内容
df.write.mode('append').format('delta') \
.option("path", "abfss://container@abcdxxxxxxxxxx.dfs.core.windows.net/delta/ddr_position_test/") \
.saveAsTable("staging.ddr_position_test")
现在我得到这个错误-x1c 0d1x
在做了一些研究后,我相信这个错误可能是由于空或非空列(只是一个疯狂的猜测),任何帮助或建议将不胜感激。
1条答案
按热度按时间3qpi33ja1#
问题是在一些表列的定义中,就像固定长度一样-看起来其中一列的某些值超过了指定的限制。
因此,您要么需要增加特定列的大小,要么不使用
varchar(N)
,而使用对文本大小没有硬性限制的string
类型。