InheritedThreadLocal在spark内部不工作

0ejtzxu1  于 2023-05-01  发布在  Apache
关注(0)|答案(1)|浏览(195)

我有一个项目,不同类型的作业在spark中运行,我们将数据转换为spark Dataframe ,并在这些 Dataframe 上应用foreach lambda,使它们并行执行。现在对于一个作业,我需要在作业开始时存储一些变量,并在不同的API,db调用中使用它们,所以我想到使用ThreadLocal来存储这些变量,然后在需要时从同一个ThreadLocal中选择。
但不久之后,我意识到spark lambda forEach正在创建不同的线程,因此ThreadLocal无法工作,所以我转移到InheritedThreadLocal,在那里我也无法获取变量。这里有一个演示代码来显示我在做什么

class Util {
    static InheritedThreadLocal<Map> threadLocal = new InheritedThreadLocal(); 
}

class Job {
    void runJob() {
        Utils.threadLocal.set(key1, value1)
        val list = someListData.asInstanceOf[RDD[Long]]
        printThreadInfo1()
        list.forEach(key => {
            doSomething()
        })
    }
    
    void doSomething() {
        printThreadLocal2()
        Utils.threadLocal.get(key1); // returns null value
    }
}

这里InheritedThreadLocal在doSomething内部返回null值,所以我尝试打印线程Info这里是printThreadInfo1()=〉threadId=1和ThreadName = main的结果,但对于printThreadInfo2()内部,打印不同的信息为threadId=34 threadName=Executor task launch worker for task 7。0在阶段0中。0(TID 7)
我认为在lambda下创建的线程应该是主线程的子线程,因此InheritedThreadLocal应该工作,但看起来它们是不同的线程。
这里有没有我做错的地方,或者有什么方法可以在不同的spark员工之间共享这些变量?

ddrv8njm

ddrv8njm1#

您看到的不同线程是预期的行为。
方法runJob()在驱动程序上执行,而方法doSomething()是在其中一个工作节点上执行的任务。这意味着这段代码可能在另一个JVM中运行,因此ThreadLocals将不起作用。
Spark docs中有一个图示说明了设置:

你可以使用broadcasts来代替ThreadLocals。This answer展示了一个简单的示例如何广播JavaMap。

相关问题