Encoder<Transaction> encoder = Encoders.bean(Transaction.class);
Dataset<Row> transactionDS = sparkSession
.read()
.format("csv")
.option("header", true)
.option("delimiter", ",")
.option("enforceSchema", false)
.option("multiLine", false)
.schema(encoder.schema())
.load("s3a://xxx/testSchema.csv");
.as(encoder);
System.out.println("==============schema starts============");
transactionDS.printSchema();
System.out.println("==============schema ends============");
transactionDS.show(10, true); // this is the line that bombs.
我的简历是-
transactionId,accountId
1,2
10,44
我在日志中打印我的模式-(您看,列现在被翻转或排序了-啊!)
==============schema starts============
root
|-- accountId: integer (nullable = true)
|-- transactionId: long (nullable = true)
==============schema ends============
我正在接近错误
Caused by: java.lang.IllegalArgumentException: CSV header does not conform to the schema.
Header: transactionId, accounted
Schema: accountId, transactionId
Expected: accountId but found: transactionId
这是我的Tranaction
类的样子。
public class Transaction implements Serializable {
private static final long serialVersionUID = 7648268336292069686L;
private Long transactionId;
private Integer accountId;
public Long getTransactionId() {
return transactionId;
}
public void setTransactionId(Long transactionId) {
this.transactionId = transactionId;
}
public Integer getAccountId() {
return accountId;
}
public void setAccountId(Integer accountId) {
this.accountId = accountId;
}
}
问题-为什么Spark无法匹配我的模式?顺序混乱了。在我的CSV中,我传递了transactionid,accountId,但是Spark使用了我的模式accountId,transactionId。啊!
3条答案
按热度按时间csbfibhn1#
不要使用encoder.schema来加载csv文件,它的列顺序可能与csv不一致。
yqlxgs2m2#
不像parquet csv没有模式,所以它不会应用正确的顺序,你可以做的是读取csv没有:
然后将架构应用于刚刚创建的数据集。
uoifb46i3#
这就是我最后做的-
_createSchema()函数如下-