我有一个场景,需要在databricks pyspark中创建多个线程。每个线程都在生成一个作业。这个工作流程大大减少了我的时间。但是我面临一个问题,如果我强制停止我的单元执行,我的子线程就不会被终止。
有时,如果我们想做一些更改,就必须取消单元格执行,因为我们不能等待单元格执行完成。但问题是,如果我取消我的databricks单元格,那么它就被取消了,但是如果我转到spark ui,我可以看到正在创建的作业,它们继续在那里执行。
我要找的是,当我取消我的细胞,所有的子线程应该得到销毁以及。但目前我还没有找到解决办法。我们将不胜感激。谢谢。
我的示例测试代码:
线程函数:
def threadWorker(spark, poolId, fileName):
spark.sparkContext.setLocalProperty("spark.scheduler.pool", poolId)
print(f"Running for {fileName}:")
time.sleep(40)
# Create a dataframe
df = spark.createDataFrame(
[(1,'a',2.0),
(2,'b', 3.0),
(3,'c', 4.0)],
['c1', 'c2', 'c3']
)
df.write\
.format("parquet") \
.mode("overwrite") \
.option("ignoreLeadingWhiteSpace", False) \
.option("ignoreTrailingWhiteSpace", False) \
.option("header", "false") \
.save(f"/Experiments/Bilal/XmlExperiments/{fileName}");
spark.sparkContext.setLocalProperty("spark.scheduler.pool", None)
呼叫手机(如果我想停止执行,我会取消):
print("Started")
hourThreads = []
hourThread1 = threading.Thread(target=threadWorker, args=(spark, "SamplePool1", "file1"))
hourThread1.daemon = True
hourThreads.append(hourThread1)
hourThread2 = threading.Thread(target=threadWorker, args=(spark, "SamplePool2", "file2"))
hourThread2.daemon = True
hourThreads.append(hourThread2)
for hourThread in hourThreads:
hourThread.start()
for hourThread in hourThreads:
hourThread.join()
print("Completed")
暂无答案!
目前还没有任何答案,快来回答吧!