csv 无法在pyspark Dataframe 中保存AsTable:apache.spark.sql.delta.schema.InvariantViolationException错误:超过char/varchar类型长度

ktca8awb  于 2023-05-26  发布在  Spark
关注(0)|答案(1)|浏览(149)

我试图合并5从csv的dataframe和创建新的dataframe比在第二步中,我创建空表与自定义模式,现在我想从dataframe加载记录。
这里是一步一步的细节。
1.从所有5个 Dataframe 创建 Dataframe

cr_df = spark.read.format("csv").option("header", "true").load("abfss://abcxxxxxxxxxxxx.dfs.core.windows.net/Position1.csv")
ir_df = spark.read.format("csv").option("header", "true").load("abfss://abc2xxxxxxxxxxxx.dfs.core.windows.net/Position2.csv")
fx_df = spark.read.format("csv").option("header", "true").load("abfss://abc3xxxxxxxxxxxx.dfs.core.windows.net/Position3.csv")
eq_df = spark.read.format("csv").option("header", "true").load("abfss://abc4xxxxxxxxxxxx.dfs.core.windows.net/Position4.csv")
co_df = spark.read.format("csv").option("header", "true").load("abfss://derdi@abc5xxxxxxxxxxxx.dfs.core.windows.net/Position5.csv")

1.合并以上 Dataframe :

merged_df = cr_df.unionByName(ir_df, allowMissingColumns=True) \
    .unionByName(fx_df, allowMissingColumns=True) \
    .unionByName(eq_df, allowMissingColumns=True) \
    .unionByName(co_df, allowMissingColumns=True)

1.使用自定义架构创建空表:

CREATE TABLE staging.ddr_position_test
       (
        ReportDate            DATE ,
        JurisdictionId        INTEGER ,
        TransactionId         VARCHAR(256) ,
        ReportAssetClass      VARCHAR(30) ,
        ReportTradeSequence   DECIMAL(4) ,
        LoadId                INTEGER,
        DataSourceId          VARCHAR(4),
        Cleared               VARCHAR(50)
) USING DELTA 
PARTITIONED BY (ReportDate, ReportAssetClass) 
LOCATION 'abfss://slv-container@xyzxxxxxxxxxxxxx.dfs.core.windows.net//silver//delta/ddr_position_test/'

第4步:我得到shcema不匹配错误,因此我创建了列数据类型为这个合并dataframe,请注意,我不能使用模式重写或合并选项,因为我有某些类型的列数据类型的要求,也有一些列需要重命名.

df = decimal_to_string_Cols_df.withColumn("ReportDate", lit("ReportDate").cast(DateType())) \
        .withColumn("JurisdictionId", lit("JurisdictionId").cast(IntegerType())) \
        .withColumn("ReportAssetClass", lit("ReportAssetClass").cast(StringType())) \
        .withColumn("ReportTradeSequence", lit("ReportTradeSequence").cast(DecimalType(4))) \
        .withColumn("LoadId", lit("LoadId").cast(IntegerType())) \
        .withColumn("CreatedTimestamp", lit("CreatedTimestamp").cast(TimestampType()))

作为最后一步(第5步),我正在编写并保存以下内容

df.write.mode('append').format('delta') \
  .option("path", "abfss://container@abcdxxxxxxxxxx.dfs.core.windows.net/delta/ddr_position_test/") \
  .saveAsTable("staging.ddr_position_test")

现在我得到这个错误-x1c 0d1x

在做了一些研究后,我相信这个错误可能是由于空或非空列(只是一个疯狂的猜测),任何帮助或建议将不胜感激。

3qpi33ja

3qpi33ja1#

问题是在一些表列的定义中,就像固定长度一样-看起来其中一列的某些值超过了指定的限制。
因此,您要么需要增加特定列的大小,要么不使用varchar(N),而使用对文本大小没有硬性限制的string类型。

相关问题