我尝试在Google Dataflow上运行Apache Beam管道,该管道从Google BigQuery读取数据,添加模式,将其转换为Dataframe,并使用第三方库(scrubadub
)对该 Dataframe 执行转换。
在GCP上的Google Code CLI中,我运行:
/usr/bin/python /home/test_user/dataflow_beam_test.py --requirements_file /home/test_user/requirements.txt
按照under "PyPi Dependencies" here的说明,我的requirements.txt文件包含(以及其他包):
scrubadub==2.0.0
我无法获得管道来将我正在使用的第三方Python库(scrubadub
)安装到远程工作者上。我已经验证了这个包可以在本地工作。
下面是相关代码:
with beam.Pipeline(argv=argv) as p:
pcoll = (p | 'read_bq_view' >>
beam.io.ReadFromBigQuery(query=BIGQUERY_SELECT_QUERY,use_standard_sql=True)
| 'ToRows' >> beam.Map(lambda x: beam.Row(id=x['id'], user_id=x['user_id'],query=x['query']))
)
df = beam_convert.to_dataframe(pcoll)
df['query'] = df['query'].apply(lambda x: scrubadub.clean(x))
这个代码块中的最后一行是导致错误的原因(我已经通过注解它并成功运行管道进行了确认)。
我尝试过在文件的顶层导入scrubadub,并将其作为run()
函数的一部分;都会抛出相同的错误:
/usr/local/lib/python3.9/site-packages/dill/_dill.py", line 826, in _import_module return __import__(import_name) ModuleNotFoundError: No module named 'scrubadub'
值得注意的是,pip install -r requirements.txt
似乎从未在工作线程上运行过。
1条答案
按热度按时间htrmnn0y1#
要解决您的问题并将
PyPi
包与Beam一起使用,我建议您使用setup.py
文件,例如:例如,将此文件放在项目的根目录下。
然后,在使用
Dataflow
启动Beam
作业的命令行中,使用setup_file
程序参数:请务必遵循以下步骤以正确启动作业:
2.45.0
):apache_beam==2.45.0
和scrubadub==2.0.0
包的版本应该与工作进程使用的包相同。runner将示例化作业。setup.py
文件提供的包(与runner使用的包版本相同)。在setup.py
中,您不必安装Beam,因为它已经提供了运行时环境。