Pyspark结构化流:无法序列化对象问题

rjzwgtxy  于 2023-06-21  发布在  Spark
关注(0)|答案(1)|浏览(377)

我不明白为什么我的代码会引发错误。
这就是我在做的

streaming_query = (
            df.writeStream.option(
                "checkpointLocation",
                f'{config["checkpoint_location"]}/{rule["module_name"]}',
            )
            .foreachBatch(ProcessActions(config, rule, logger, session=spark_session).process)
            .trigger(processingTime="0 seconds")
            .queryName(rule["module_name"])
            .start()
        )
def process(self, df: pyspark.sql.DataFrame, _: int) -> None:

        if df.isEmpty():
            return

        df.foreachPartition(self.process_rows)

但是每当调用df.foreachPartition时,我都会得到错误:_pickle.PicklingError: Could not serialize object: TypeError: cannot pickle '_thread.lock' object
我使用的是df.collect()而不是df.foreachPartition,但我注意到这不是一个最佳实践,因为它限制了并行性,因为它需要单个节点驱动程序的工作。我之前的代码是这样的:

def process(self, df: pyspark.sql.DataFrame, _: int) -> None:

        if df.isEmpty():
            return

        rows = df.collect()
        self.process_rows(rows)

我尝试删除foreachBatch()中传递给ProcessAction()的每个参数,但它以同样的方式中断。我也尝试过向foreachBatch()传递一个函数,而不是一个类及其方法,但结果是一样的。我在使用df.foreach而不是df.foreachPartition时也会遇到这个问题

rdrgkggo

rdrgkggo1#

您遇到的错误,_pickle. PicklingError:无法序列化对象:TypeError:cannot pickle '_thread. lock' object,在尝试序列化包含不可序列化属性的对象时发生。在这种情况下,问题可能是由于试图将记录器对象**('logger ')传递到' ProcessActions '类导致的,Spark试图将其序列化并分发到工作节点。
Spark的序列化机制使用了Pickle,并且不是所有的对象都可以被pickle。特别是,依赖于底层系统资源(如锁)的对象不能被序列化。
若要解决此问题,可以通过以下方式修改代码:
1.将
'logger'对象创建移动到'ProcessActions'**类中,并使其成为类属性。这样,它就不会在序列化期间作为参数传递。

class ProcessActions:
    def __init__(self, config, rule, session=spark_session):
        self.logger = create_logger()  # Create the logger object inside the class

    def process(self, df: pyspark.sql.DataFrame, _: int) -> None:
        # Rest of your code

1.如果在**'ProcessActions'类中有其他不可序列化的属性,请确保它们是transient(标记为'@transient'**decorator),或者在不需要时将其移到类外。

相关问题