我想在spark程序的末尾创建一个数据集,并将该数据集保存到mysql。我用listener来监听job end,并在其中创建和保存数据集。
听众是
SparkListener sparkListener=new SparkListener() {
@Override
public void onJobEnd(SparkListenerJobEnd jobEnd) {
try{
TimeUtils.saveParameterData(sparkSession,appNameSplit,batchJobRunId,moduleName);
}catch (Exception e){
}
super.onJobEnd(jobEnd);
}
};
sparkSession.sparkContext().addSparkListener(sparkListener);
saveparameterdata方法的保存方式如下
try{
//the error occurs at this line
df.write().mode(SaveMode.Append).jdbc(url,"Time_Count_Stats",connectionProperties);
}catch (Exception e){
System.out.println(e);
}
连接属性为
Properties connectionProperties=new Properties();
connectionProperties.setProperty("user","***myusername****");
connectionProperties.setProperty("password","****mypassword****");
connectionProperties.setProperty("driver","com.mysql.cj.jdbc.Driver");
这给了我一个错误
java.lang.illegalstateexception:无法对已停止的sparkcontext调用方法。
我一直试图解决这个错误,但找不到解决办法。有人能帮忙吗?
暂无答案!
目前还没有任何答案,快来回答吧!