我不明白为什么我的代码会引发错误。
这就是我在做的
streaming_query = (
df.writeStream.option(
"checkpointLocation",
f'{config["checkpoint_location"]}/{rule["module_name"]}',
)
.foreachBatch(ProcessActions(config, rule, logger, session=spark_session).process)
.trigger(processingTime="0 seconds")
.queryName(rule["module_name"])
.start()
)
def process(self, df: pyspark.sql.DataFrame, _: int) -> None:
if df.isEmpty():
return
df.foreachPartition(self.process_rows)
但是每当调用df.foreachPartition
时,我都会得到错误:_pickle.PicklingError: Could not serialize object: TypeError: cannot pickle '_thread.lock' object
我使用的是df.collect()
而不是df.foreachPartition
,但我注意到这不是一个最佳实践,因为它限制了并行性,因为它需要单个节点驱动程序的工作。我之前的代码是这样的:
def process(self, df: pyspark.sql.DataFrame, _: int) -> None:
if df.isEmpty():
return
rows = df.collect()
self.process_rows(rows)
我尝试删除foreachBatch()
中传递给ProcessAction()
的每个参数,但它以同样的方式中断。我也尝试过向foreachBatch()
传递一个函数,而不是一个类及其方法,但结果是一样的。我在使用df.foreach
而不是df.foreachPartition
时也会遇到这个问题
1条答案
按热度按时间rdrgkggo1#
您遇到的错误,_pickle. PicklingError:无法序列化对象:TypeError:cannot pickle '_thread. lock' object,在尝试序列化包含不可序列化属性的对象时发生。在这种情况下,问题可能是由于试图将记录器对象**('logger ')传递到' ProcessActions '类导致的,Spark试图将其序列化并分发到工作节点。
Spark的序列化机制使用了Pickle,并且不是所有的对象都可以被pickle。特别是,依赖于底层系统资源(如锁)的对象不能被序列化。
若要解决此问题,可以通过以下方式修改代码:
1.将'logger'对象创建移动到'ProcessActions'**类中,并使其成为类属性。这样,它就不会在序列化期间作为参数传递。
1.如果在**'ProcessActions'类中有其他不可序列化的属性,请确保它们是transient(标记为'@transient'**decorator),或者在不需要时将其移到类外。