我使用beam构建并成功地在本地运行了一个令人满意的管道,并且准备将作业发送到DataFlow。
我计划只使用save_main_session
管道选项pickle我的会话,但是在尝试这样做时遇到了递归错误。经过几次尝试和错误,我设法将其缩小到我定义ptransform_fn
的方式,使用装饰器。
请在下面找到一个最小可重复的示例
# my_script.py
from typing import Set
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions
from apache_beam.transforms.ptransform import ptransform_fn
@ptransform_fn
def my_function(pcoll):
return pcoll | beam.Create([1])
if __name__ == "__main__":
options = PipelineOptions()
options.view_as(SetupOptions).save_main_session = True
with beam.Pipeline(options=options) as p:
p | my_function()
完整的回溯相当长,但以RecursionError: maximum recursion depth exceeded while calling a Python object
结束
(Note这是启用此错误的save_main_session=True
)选项,因此我可以使用本地运行程序运行此python -m my_script
,并将运行到RecursionError
)
由于ptransform_fn
实际上使my_function
以一种“非Python”的方式(在没有定义参数的情况下被调用)运行,因此似乎pickler库在这方面有问题。
我最后的问题是:
- 这是预期的行为吗?如果我想使用保存_main_session,我应该坚持定义继承自Beam.PTransform的类吗?
- 有没有一种简单的方法(如设置管道选项)能够pickle此脚本并在数据流上运行它?
2条答案
按热度按时间cnwbcb6i1#
save_main_session
本质上有点脆弱;对于任何重要的东西,我建议将逻辑放在一个命名模块中,该模块可以导入到主脚本(和worker)中。4sup72z82#
定义一个从
beam.PTransform
继承的类对我来说很有用。这解决了pickle的函数调用“非Python”的问题。