如何在pyspark数据库中杀死python线程?

3duebb1j  于 2021-05-24  发布在  Spark
关注(0)|答案(0)|浏览(261)

我有一个场景,需要在databricks pyspark中创建多个线程。每个线程都在生成一个作业。这个工作流程大大减少了我的时间。但是我面临一个问题,如果我强制停止我的单元执行,我的子线程就不会被终止。
有时,如果我们想做一些更改,就必须取消单元格执行,因为我们不能等待单元格执行完成。但问题是,如果我取消我的databricks单元格,那么它就被取消了,但是如果我转到spark ui,我可以看到正在创建的作业,它们继续在那里执行。
我要找的是,当我取消我的细胞,所有的子线程应该得到销毁以及。但目前我还没有找到解决办法。我们将不胜感激。谢谢。
我的示例测试代码:
线程函数:

def threadWorker(spark, poolId, fileName):
  spark.sparkContext.setLocalProperty("spark.scheduler.pool", poolId)

  print(f"Running for {fileName}:")

  time.sleep(40)

  # Create a dataframe
  df = spark.createDataFrame(
      [(1,'a',2.0), 
      (2,'b', 3.0),
      (3,'c', 4.0)],
      ['c1', 'c2', 'c3']
  )

  df.write\
     .format("parquet") \
     .mode("overwrite") \
     .option("ignoreLeadingWhiteSpace", False) \
     .option("ignoreTrailingWhiteSpace", False) \
     .option("header", "false") \
     .save(f"/Experiments/Bilal/XmlExperiments/{fileName}");

  spark.sparkContext.setLocalProperty("spark.scheduler.pool", None)

呼叫手机(如果我想停止执行,我会取消):

print("Started")

hourThreads = []

hourThread1 = threading.Thread(target=threadWorker, args=(spark, "SamplePool1", "file1"))
hourThread1.daemon = True
hourThreads.append(hourThread1)

hourThread2 = threading.Thread(target=threadWorker, args=(spark, "SamplePool2", "file2"))
hourThread2.daemon = True
hourThreads.append(hourThread2)

for hourThread in hourThreads:
  hourThread.start()

for hourThread in hourThreads:
  hourThread.join()

print("Completed")

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题