我想读取一个csv文件,并使用apache beam数据流将其写入BigQuery。为了做到这一点,我需要以字典的形式将数据呈现给BigQuery。我如何使用apache beam转换数据来做到这一点?
我的输入csv文件有两列,我想在BigQuery中创建一个后续的两列表。我知道如何在BigQuery中创建数据,这很简单,我不知道的是如何将csv转换为字典。下面的代码是不正确的,但应该给予我想做的事情的想法。
# Standard imports
import apache_beam as beam
# Create a pipeline executing on a direct runner (local, non-cloud).
p = beam.Pipeline('DirectPipelineRunner')
# Create a PCollection with names and write it to a file.
(p
| 'read solar data' >> beam.Read(beam.io.TextFileSource('./sensor1_121116.csv'))
# How do you do this??
| 'convert to dictionary' >> beam.Map(lambda (k, v): {'luminosity': k, 'datetime': v})
| 'save' >> beam.Write(
beam.io.BigQuerySink(
output_table,
schema='month:INTEGER, tornado_count:INTEGER',
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)))
p.run()
3条答案
按热度按时间jjhzyzn01#
编辑:从2.12.0版本开始,Beam附带了新的
fileio
转换,允许您从CSV读取,而无需重新实现源代码。你可以这样做:我最近为Apache Beam写了一个测试。你可以看看Github仓库。
这个想法是要有一个返回解析的CSV行的源。您可以通过将
FileBasedSource
类子类化以包含CSV解析来实现这一点。特别是,read_records
函数看起来像这样:ssgvzors2#
作为对巴勃罗帖子的补充,我想分享我自己对他的样本做的一点修改。+1为你!)
变更:
reader = csv.reader(self._file)
至reader = csv.DictReader(self._file)
csv.DictReader
使用CSV文件的第一行作为Dict键。其他行用于使用每行的值填充dict。它会根据列顺序自动将正确的值放到正确的键上。一个小细节是Dict中的每个值都存储为字符串。这可能会与您的BigQuery模式冲突,如果您使用例如。某些字段的整数。因此,你需要照顾适当的铸造后。
ncgqoxb03#
属性错误:module 'apache_beam.io' has no attribute 'TextFileSink' ->是否有此代码的更新?