当运行程序时,我试图将spark df保存为增量表。然而,我遇到了一个错误,当我使用.saveAsTable时,sparkContext被停止了。我正在运行一个for循环来运行多个查询并将它们写入增量表,但是有一个特定的查询不喜欢被转换为增量表。下面是错误:
An error occurred while calling o4168.saveAsTable.
: org.apache.spark.SparkException: Job aborted.
#lots of at statements
Caused by: java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext.
This stopped SparkContext was created at:
org.apache.spark.SparkContext.getOrCreate(SparkContext.scala)
org.apache.livy.rsc.driver.SparkEntries.sc(SparkEntries.java:52)
org.apache.livy.rsc.driver.SparkEntries.sparkSession(SparkEntries.java:66)
org.apache.livy.repl.AbstractSparkInterpreter.postStart(AbstractSparkInterpreter.scala:144)
org.apache.livy.repl.SparkInterpreter.$anonfun$start$1(SparkInterpreter.scala:138)
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
org.apache.livy.repl.AbstractSparkInterpreter.restoreContextClassLoader(AbstractSparkInterpreter.scala:495)
org.apache.livy.repl.SparkInterpreter.start(SparkInterpreter.scala:113)
org.apache.livy.repl.Session.$anonfun$start$1(Session.scala:283)
scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659)
scala.util.Success.$anonfun$map$1(Try.scala:255)
scala.util.Success.map(Try.scala:213)
scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
java.lang.Thread.run(Thread.java:750)
The currently active sparkContext was created at:
org.apache.spark.SparkContext.getOrCreate(SparkContext.scala)
org.apache.livy.rsc.driver.SparkEntries.sc(SparkEntries.java:52)
org.apache.livy.rsc.driver.SparkEntries.sparkSession(SparkEntries.java:66)
org.apache.livy.repl.AbstractSparkInterpreter.postStart(AbstractSparkInterpreter.scala:144)
org.apache.livy.repl.SparkInterpreter.$anonfun$start$1(SparkInterpreter.scala:138)
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
org.apache.livy.repl.AbstractSparkInterpreter.restoreContextClassLoader(AbstractSparkInterpreter.scala:495)
org.apache.livy.repl.SparkInterpreter.start(SparkInterpreter.scala:113)
org.apache.livy.repl.Session.$anonfun$start$1(Session.scala:283)
scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659)
scala.util.Success.$anonfun$map$1(Try.scala:255)
scala.util.Success.map(Try.scala:213)
scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
java.lang.Thread.run(Thread.java:750)
#more at statements
任何帮助都很感激
更新:以下是我的Save语句:
spark_df.write.mode("overwrite").format("delta").saveAsTable(tablename)
错误位于. saveAsTable
1条答案
按热度按时间cedebl8k1#
通过使用.saveAsTable()函数,您可以以结构化的方式保存数据分析或处理任务的结果,使您能够在将来高效地查询和分析数据。这在处理大型数据集或与其他人合作进行数据分析项目时特别有用。