如何设置spark任务或Map操作的超时(或跳过长时间运行的任务)

tf7tbtn2  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(691)

我正在使用spark并行处理一百万个任务。例如,培训100万名模特。
我需要确保尽可能多的成功,但也有很多失败。在spark中,如果只有一个模型找不到最佳解决方案,它可能会被挂起并永远运行。在这种情况下,spark作业永远不会完成,而kill此作业不会将其他999999模型保存到hdfs。
这个问题真的很严重。
我四处寻找,但没有找到有用的东西: spark.task.maxFailures :没有失败,因此不会生效。 spark.network.timeout :没有网络问题。 spark.executor.heartbeatInterval :无亲属。
核心训练代码,主要使用rdd.map进行训练

df1 = (df.rdd
      .map(lambda r: r.asDict())
      .map(lambda d: transform_data(d))
      .map(lambda d: create_model(d))
      .map(lambda d: model_fit(d))
      .map(lambda d: pickle_model(d))
)

如何设置spark任务的超时?或者有什么好的消炎药吗?

imzjd6km

imzjd6km1#

我不认为这可以是配置级别的控制器。您可能只想将此应用于spark任务的一个子集。 SparkListener 因为您可以在任务、阶段、工作级别挂接,然后使用 sparkContenxt .

/**
   * Called when a task starts
   */
  def onTaskStart(taskStart: SparkListenerTaskStart): Unit

在上面,您可以实现超时逻辑。
可以使用sparkcontext通过 def cancelStage(stageId: Int) 您可以从侦听器事件获取特定的id

相关问题