我有一个ApacheBeam管道,它将一个列表作为参数,并在Filter and Map函数中使用它。由于这些列表可以作为字符串使用,我已经使用ast.literal_eval对它们进行了转换。有没有其他更好的方法来做同样的事情?
import argparse
import ast
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions
def run_pipeline(custom_args, beam_args):
elements = [
{'name': 'Jim', 'join_year': 2010, 'location': 'LA', 'role': 'Executive assistant'},
{'name': 'Tim', 'join_year': 2015, 'location': 'NY', 'role': 'Account manager'},
{'name': 'John', 'join_year': 2010, 'location': 'LA', 'role': 'Customer service representative'},
{'name': 'Bob', 'join_year': 2020, 'location': 'NJ', 'role': 'Customer service representative'},
{'name': 'Michael', 'join_year': 2019, 'location': 'CA', 'role': 'Scheduler'},
{'name': 'Adam', 'join_year': 2010, 'location': 'CA', 'role': 'Customer service representative'},
{'name': 'Andrew', 'join_year': 2009, 'location': 'TX', 'role': 'Account manager'},
{'name': 'James', 'join_year': 2017, 'location': 'NJ', 'role': 'Executive assistant'},
{'name': 'Paul', 'join_year': 2015, 'location': 'NY', 'role': 'Scheduler'},
{'name': 'Justin', 'join_year': 2015, 'location': 'NJ', 'role': 'Scheduler'}
]
opts = PipelineOptions(beam_args)
joinYear = [i for i in ast.literal_eval(custom_args.joinYear)]
selectCols = [i for i in ast.literal_eval(custom_args.selectCols)]
with beam.Pipeline(options=opts) as p:
(p
| "Create" >> beam.Create(elements)
| "Filter for join year in 2010 and 2015" >> beam.Filter(lambda item: item['join_year'] in joinYear)
| "Select name and location columns" >> beam.Map(lambda line : {key:value for (key,value) in line.items() if key in selectCols})
| beam.Map(print)
)
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--joinYear",required=True)
parser.add_argument("--selectCols",required=True)
my_args, beam_args = parser.parse_known_args()
run_pipeline(my_args, beam_args)
if __name__ == '__main__':
main()
我像python filterlist.py --joinYear='[2010,2015]' --selectCols="['name','location']"
这样运行上面的代码
在实际的生产使用中,我会从云函数传递这些参数并启动数据流作业。所以我想知道是否有其他更好的方法来做同样的事情,下面是更好的实践?
1条答案
按热度按时间r6hnlfcb1#
不幸的是,您不能直接将
Python
列表作为程序参数传递。你的选择和你的实现正朝着好的方向前进。
如果需要传递
List
、Dict
、List of Dict
等结构,则需要在pipeline选项中将其作为String
传递,然后在作业中将此String
转换为预期的结构。它是一种序列化(String)/反序列化(Object)。
如果传递
Cloud Function
或其他进程,则可以在代码中以编程方式为选项格式化和应用序列化。例如,对于
Dict
或List of Dict
,可以用途:json.dump
json.loads