有没有办法在spark中获取Map任务的id?例如,如果每个map任务调用一个用户定义的函数,我可以从该用户定义函数的whithin获取该map任务的id吗?
ippsafx71#
我相信 TaskContext.taskAttemptId 是你想要的。您可以通过获取函数中当前任务的上下文 TaskContext.get .
TaskContext.taskAttemptId
TaskContext.get
roejwanj2#
我不知道你所说的Map任务id是什么意思,但你可以使用 TaskContext :
TaskContext
import org.apache.spark.TaskContext sc.parallelize(Seq[Int](), 4).mapPartitions(_ => { val ctx = TaskContext.get val stageId = ctx.stageId val partId = ctx.partitionId val hostname = java.net.InetAddress.getLocalHost().getHostName() Iterator(s"Stage: $stageId, Partition: $partId, Host: $hostname") }).collect.foreach(println)
spark 2.2.0(spark-18576)中的pyspark也添加了类似的功能:
from pyspark import TaskContext import socket def task_info(*_): ctx = TaskContext() return ["Stage: {0}, Partition: {1}, Host: {2}".format( ctx.stageId(), ctx.partitionId(), socket.gethostname())] for x in sc.parallelize([], 4).mapPartitions(task_info).collect(): print(x)
2条答案
按热度按时间ippsafx71#
我相信
TaskContext.taskAttemptId
是你想要的。您可以通过获取函数中当前任务的上下文TaskContext.get
.roejwanj2#
我不知道你所说的Map任务id是什么意思,但你可以使用
TaskContext
:spark 2.2.0(spark-18576)中的pyspark也添加了类似的功能: