在部署了一个spark结构流应用程序之后,如何在执行器上获得一个spark会话,以便部署另一个具有相同会话和相同配置设置的作业?
knsnq2tg1#
如果你在集群模式下运行spark,你不能将spark会话连接到执行器,因为spark会话对象不能被序列化,因此不能将其发送到执行器。而且,这样做违反了spark设计原则。如果你能告诉我问题的陈述,我也许能帮助你。
0g0grzrc2#
从技术上讲,你可以在执行器上得到spark session,不管你在哪种模式下运行它,但不值得这么做。Spark session是各种内部spark设置的对象沿着我们在启动时提供的其他用户定义设置。这些配置设置在执行器中不可用的唯一原因是,它们中的大多数被标记为transient,这意味着这些对象将作为null发送,因为将它们发送到执行器没有逻辑意义,同样,将数据库连接对象从一个节点发送到另一个节点也没有意义。要做到这一点,一种麻烦的方法是从驱动程序中的Spark会话获取所有配置设置,设置一些标记为可序列化的自定义对象,并将其发送到执行器。此外,您的执行器环境应该与驱动程序在所有spark jar/目录和其他spark属性(如SPARK_HOME等)方面相同,如果您运行并意识到每次您丢失了一些东西,则可能会非常繁忙。它将是一个不同的spark会话对象,但具有所有相同的设置。更好的选择是使用您为其他应用程序提供的相同设置运行另一个spark应用程序,因为一个spark会话与一个spark应用程序相关联。
xqkwcwgp3#
这是不可能的。我也有类似的需求,然后我必须创建两个单独的主类和一个spark启动器类,因为我是根据我想要运行的类来做sparksession.conf.set(主类名)的。如果我想运行这两个类,那么我会使用thread.sleep()在启动另一个类之前先完成。我还使用sparkListener代码来获取它是否完成的状态。
vuv7lop34#
我知道这是一个迟来的回应。只是觉得这可能是有用的。所以,你可以在你的spark结构化流媒体应用程序中使用类似下面的代码片段:Spark型〈= 3.2.1
spark_session_for_this_micro_batch = microBatchOutputDF._jdf.sparkSession()
对于Spark版本〉= 3.3.1:
spark_session_for_this_micro_batch = microBatchOutputDF.sparkSession
您的函数可以使用此spark会话在其中创建 Dataframe 。您可以参考此medium post pyspark文档
4条答案
按热度按时间knsnq2tg1#
如果你在集群模式下运行spark,你不能将spark会话连接到执行器,因为spark会话对象不能被序列化,因此不能将其发送到执行器。而且,这样做违反了spark设计原则。
如果你能告诉我问题的陈述,我也许能帮助你。
0g0grzrc2#
从技术上讲,你可以在执行器上得到spark session,不管你在哪种模式下运行它,但不值得这么做。Spark session是各种内部spark设置的对象沿着我们在启动时提供的其他用户定义设置。
这些配置设置在执行器中不可用的唯一原因是,它们中的大多数被标记为transient,这意味着这些对象将作为null发送,因为将它们发送到执行器没有逻辑意义,同样,将数据库连接对象从一个节点发送到另一个节点也没有意义。
要做到这一点,一种麻烦的方法是从驱动程序中的Spark会话获取所有配置设置,设置一些标记为可序列化的自定义对象,并将其发送到执行器。此外,您的执行器环境应该与驱动程序在所有spark jar/目录和其他spark属性(如SPARK_HOME等)方面相同,如果您运行并意识到每次您丢失了一些东西,则可能会非常繁忙。它将是一个不同的spark会话对象,但具有所有相同的设置。
更好的选择是使用您为其他应用程序提供的相同设置运行另一个spark应用程序,因为一个spark会话与一个spark应用程序相关联。
xqkwcwgp3#
这是不可能的。我也有类似的需求,然后我必须创建两个单独的主类和一个spark启动器类,因为我是根据我想要运行的类来做sparksession.conf.set(主类名)的。如果我想运行这两个类,那么我会使用thread.sleep()在启动另一个类之前先完成。我还使用sparkListener代码来获取它是否完成的状态。
vuv7lop34#
我知道这是一个迟来的回应。只是觉得这可能是有用的。所以,你可以在你的spark结构化流媒体应用程序中使用类似下面的代码片段:
Spark型〈= 3.2.1
对于Spark版本〉= 3.3.1:
您的函数可以使用此spark会话在其中创建 Dataframe 。您可以参考此medium post pyspark文档