import org.springframework.stereotype.Service;
import java.io.Serializable;
@Service
class Job implements Serializable
{
private Dataset<String> getPropIdValueEqualsDataset(final SparkSession sparkSession, final Dataset<Root> dataset, final String propId)
{
Map<String, Field> propFieldMap = Arrays.stream(FieldUtils.getAllFields(Root.class)).collect(Collectors.toMap(Field::getName, Function.identity()));
Dataset<String> strDataset = dataset.flatMap(new FlatMapFunction<Root, String>()
{
@Override
public Iterator<String> call(Root root) throws Exception
{
List<String> list = new ArrayList<>();
Field dataField = propFieldMap.get(propId);
dataField.setAccessible(true);
String dataFieldValue = (String)dataField.get(root);
list.add(dataFieldValue);
return list.iterator();
}
}, Encoders.bean(String.class));
return strDataset;
}
}
我犯了这两个错误
org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: java.lang.reflect.Field Serialization stack: org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: com.app.Job$$EnhancerBySpringCGLIB$$96d07
如何修复此错误?
暂无答案!
目前还没有任何答案,快来回答吧!