python-3.x 使用自定义ptransform时,使用数据流运行器构建光束管道时出现递归错误

inkz8wg9  于 2023-07-01  发布在  Python
关注(0)|答案(2)|浏览(104)

我使用beam构建并成功地在本地运行了一个令人满意的管道,并且准备将作业发送到DataFlow。
我计划只使用save_main_session管道选项pickle我的会话,但是在尝试这样做时遇到了递归错误。经过几次尝试和错误,我设法将其缩小到我定义ptransform_fn的方式,使用装饰器。
请在下面找到一个最小可重复的示例

# my_script.py

from typing import Set
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions
from apache_beam.transforms.ptransform import ptransform_fn

@ptransform_fn
def my_function(pcoll):
    return pcoll | beam.Create([1])

if __name__ == "__main__":
    options = PipelineOptions()
    options.view_as(SetupOptions).save_main_session = True
    with beam.Pipeline(options=options) as p:
        p | my_function()

完整的回溯相当长,但以RecursionError: maximum recursion depth exceeded while calling a Python object结束
(Note这是启用此错误的save_main_session=True)选项,因此我可以使用本地运行程序运行此python -m my_script,并将运行到RecursionError
由于ptransform_fn实际上使my_function以一种“非Python”的方式(在没有定义参数的情况下被调用)运行,因此似乎pickler库在这方面有问题。
我最后的问题是:

  • 这是预期的行为吗?如果我想使用保存_main_session,我应该坚持定义继承自Beam.PTransform的类吗?
  • 有没有一种简单的方法(如设置管道选项)能够pickle此脚本并在数据流上运行它?
cnwbcb6i

cnwbcb6i1#

save_main_session本质上有点脆弱;对于任何重要的东西,我建议将逻辑放在一个命名模块中,该模块可以导入到主脚本(和worker)中。

4sup72z8

4sup72z82#

定义一个从beam.PTransform继承的类对我来说很有用。这解决了pickle的函数调用“非Python”的问题。

class MyFunction(beam.PTransform):
    def __init__(self):
        pass

    def expand(pcoll):
        return pcoll | beam.Create([1])

if __name__ == "__main__":
    options = PipelineOptions()
    options.view_as(SetupOptions).save_main_session = True
    with beam.Pipeline(options=options) as p:
        p | MyFunction()

相关问题