Apache Beam传递列表作为参数- Python SDK

wydwbb8l  于 2023-03-04  发布在  Python
关注(0)|答案(1)|浏览(104)

我有一个ApacheBeam管道,它将一个列表作为参数,并在Filter and Map函数中使用它。由于这些列表可以作为字符串使用,我已经使用ast.literal_eval对它们进行了转换。有没有其他更好的方法来做同样的事情?

import argparse
import ast
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions

def run_pipeline(custom_args, beam_args):
    elements = [
     {'name': 'Jim', 'join_year': 2010, 'location': 'LA', 'role': 'Executive assistant'},
     {'name': 'Tim', 'join_year': 2015, 'location': 'NY', 'role': 'Account manager'},
     {'name': 'John', 'join_year': 2010, 'location': 'LA', 'role': 'Customer service representative'},
     {'name': 'Bob', 'join_year': 2020, 'location': 'NJ', 'role': 'Customer service representative'},
     {'name': 'Michael', 'join_year': 2019, 'location': 'CA', 'role': 'Scheduler'},
     {'name': 'Adam', 'join_year': 2010, 'location': 'CA', 'role': 'Customer service representative'},
     {'name': 'Andrew', 'join_year': 2009, 'location': 'TX', 'role': 'Account manager'},
     {'name': 'James', 'join_year': 2017, 'location': 'NJ', 'role': 'Executive assistant'},
     {'name': 'Paul', 'join_year': 2015, 'location': 'NY', 'role': 'Scheduler'},
     {'name': 'Justin', 'join_year': 2015, 'location': 'NJ', 'role': 'Scheduler'}
     ]

    opts = PipelineOptions(beam_args)
    joinYear = [i for i in ast.literal_eval(custom_args.joinYear)]
    selectCols = [i for i in ast.literal_eval(custom_args.selectCols)]

    with beam.Pipeline(options=opts) as p:
        (p 
        | "Create" >> beam.Create(elements) 
        | "Filter for join year in 2010 and 2015" >> beam.Filter(lambda item: item['join_year'] in joinYear) 
        | "Select name and location columns" >> beam.Map(lambda line : {key:value for (key,value) in line.items() if key in selectCols})
        | beam.Map(print)
        )

def main():
    parser = argparse.ArgumentParser()
    parser.add_argument("--joinYear",required=True)
    parser.add_argument("--selectCols",required=True)

    my_args, beam_args = parser.parse_known_args()
    run_pipeline(my_args, beam_args)

if __name__ == '__main__':
    main()

我像python filterlist.py --joinYear='[2010,2015]' --selectCols="['name','location']"这样运行上面的代码
在实际的生产使用中,我会从云函数传递这些参数并启动数据流作业。所以我想知道是否有其他更好的方法来做同样的事情,下面是更好的实践?

r6hnlfcb

r6hnlfcb1#

不幸的是,您不能直接将Python列表作为程序参数传递。
你的选择和你的实现正朝着好的方向前进。
如果需要传递ListDictList of Dict等结构,则需要在pipeline选项中将其作为String传递,然后在作业中将此String转换为预期的结构。
它是一种序列化(String)/反序列化(Object)。
如果传递Cloud Function或其他进程,则可以在代码中以编程方式为选项格式化和应用序列化。
例如,对于DictList of Dict,可以用途:

  • 用于序列化的json.dump
  • 用于反序列化的json.loads

相关问题