Spark CVS加载-自定义架构-自定义对象

yjghlzjz  于 2023-02-05  发布在  Apache
关注(0)|答案(3)|浏览(213)
Encoder<Transaction> encoder = Encoders.bean(Transaction.class);
Dataset<Row> transactionDS = sparkSession
                                    .read()
                                    .format("csv")
                                    .option("header", true)
                                    .option("delimiter", ",")
                                    .option("enforceSchema", false)
                                    .option("multiLine", false)
                                    .schema(encoder.schema())
                                    .load("s3a://xxx/testSchema.csv");
                                    .as(encoder);

    System.out.println("==============schema starts============");
    transactionDS.printSchema();
    System.out.println("==============schema ends============");

transactionDS.show(10, true); // this is the line that bombs.

我的简历是-

transactionId,accountId
1,2
10,44

我在日志中打印我的模式-(您看,列现在被翻转或排序了-啊!)

==============schema starts============
root
 |-- accountId: integer (nullable = true)
 |-- transactionId: long (nullable = true)

==============schema ends============

我正在接近错误

Caused by: java.lang.IllegalArgumentException: CSV header does not conform to the schema.
           Header: transactionId, accounted
           Schema: accountId, transactionId
           Expected: accountId but found: transactionId

这是我的Tranaction类的样子。

public class Transaction implements Serializable {

private static final long serialVersionUID = 7648268336292069686L;

private Long transactionId;
private Integer accountId;

public Long getTransactionId() {
    return transactionId;
}

public void setTransactionId(Long transactionId) {
    this.transactionId = transactionId;
}

public Integer getAccountId() {
    return accountId;
}

public void setAccountId(Integer accountId) {
    this.accountId = accountId;
}
}

问题-为什么Spark无法匹配我的模式?顺序混乱了。在我的CSV中,我传递了transactionid,accountId,但是Spark使用了我的模式accountId,transactionId。啊!

csbfibhn

csbfibhn1#

不要使用encoder.schema来加载csv文件,它的列顺序可能与csv不一致。

yqlxgs2m

yqlxgs2m2#

不像parquet csv没有模式,所以它不会应用正确的顺序,你可以做的是读取csv没有:

.schema(encoder.schema())

然后将架构应用于刚刚创建的数据集。

uoifb46i

uoifb46i3#

这就是我最后做的-

Encoder<Transaction> encoder = Encoders.bean(Transaction.class);
            
    // read data from S3
    System.out.println("Going to read file......................................");
    Dataset<Transaction> transactionDS = sparkSession
                                    .read()
                                    .format("csv")
                                    .option("header", true)
                                    .option("delimiter", ",")
                                    //.option("enforceSchema", false)
                                    .option("inferSchema", false)
                                    .option("dateFormat", "yyyy-MM-dd")
                                    //.option("multiLine", false)
                                    //.schema(encoder.schema())
                                    .schema(_createSchema())                                         
                                   .csv("s3a://xxx/transactions_4_with_column_names.csv")             
                                    .as(encoder);

_createSchema()函数如下-

private static StructType _createSchema() {
    
    List<StructField> list = new ArrayList<StructField>() {
        
        private static final long serialVersionUID = -4953991596584287923L;

        {
            add(DataTypes.createStructField("transactionId", DataTypes.LongType, true));
            add(DataTypes.createStructField("accountId", DataTypes.IntegerType, true));
            add(DataTypes.createStructField("destAccountId", DataTypes.IntegerType, true));
            add(DataTypes.createStructField("destPostDate", DataTypes.DateType, true));                                         
        }
    };
    
    return new StructType(list.toArray(new StructField[0]));
}

相关问题