如何在spark中获取map任务的id?

mlmc2os5  于 2021-06-02  发布在  Hadoop
关注(0)|答案(2)|浏览(415)

有没有办法在spark中获取Map任务的id?例如,如果每个map任务调用一个用户定义的函数,我可以从该用户定义函数的whithin获取该map任务的id吗?

ippsafx7

ippsafx71#

我相信 TaskContext.taskAttemptId 是你想要的。您可以通过获取函数中当前任务的上下文 TaskContext.get .

roejwanj

roejwanj2#

我不知道你所说的Map任务id是什么意思,但你可以使用 TaskContext :

import org.apache.spark.TaskContext

sc.parallelize(Seq[Int](), 4).mapPartitions(_ => {
    val ctx = TaskContext.get
    val stageId = ctx.stageId
    val partId = ctx.partitionId
    val hostname = java.net.InetAddress.getLocalHost().getHostName()
    Iterator(s"Stage: $stageId, Partition: $partId, Host: $hostname")
}).collect.foreach(println)

spark 2.2.0(spark-18576)中的pyspark也添加了类似的功能:

from pyspark import TaskContext
import socket

def task_info(*_):
    ctx = TaskContext()
    return ["Stage: {0}, Partition: {1}, Host: {2}".format(
        ctx.stageId(), ctx.partitionId(), socket.gethostname())]

for x in sc.parallelize([], 4).mapPartitions(task_info).collect():
    print(x)

相关问题