我有一个将csv数据读入spark的 Dataset
. 如果我只是简单地阅读并返回 data
.
但是,如果我申请 MapFunction
到 data
在从函数返回之前,我得到
Exception in thread "main" org.apache.spark.SparkException: Task not serializable Caused by: java.io.NotSerializableException: com.Workflow
.
我知道spark正在工作,它需要序列化对象以进行分布式处理,但是,我没有使用任何对spark的引用 Workflow
类在我的Map逻辑中。我不打电话给任何人 Workflow
我的Map逻辑中的类函数。那么spark为什么要尝试序列化呢 Workflow
上课?任何帮助都将不胜感激。
public class Workflow {
private final SparkSession spark;
public Dataset<Row> readData(){
final StructType schema = new StructType()
.add("text", "string", false)
.add("category", "string", false);
Dataset<Row> data = spark.read()
.schema(schema)
.csv(dataPath);
/*
* works fine till here if I call
* return data;
*/
Dataset<Row> cleanedData = data.map(new MapFunction<Row, Row>() {
public Row call(Row row){
/* some mapping logic */
return row;
}
}, RowEncoder.apply(schema));
cleanedData.printSchema();
/* .... ERROR .... */
cleanedData.show();
return cleanedData;
}
}
1条答案
按热度按时间9nvpjoqh1#
您可以使工作流实现序列化,并将sparksession作为@transient