pandas 为什么这个Apache Beam管道阅读Excel文件并从中创建.CSV不起作用?

aurhwmvo  于 2023-01-15  发布在  Apache
关注(0)|答案(1)|浏览(113)

我是Apache Beam的新手,在执行这个简单的任务时遇到了以下问题:我试图创建一个新的**.csv文件开始从一个.xlsxExcel文件。要做到这一点,我使用Apache梁与Python 3语言和Pandas**库。我承认,所有的主题对我来说都很新。
我正在研究Google Colab,但我认为这并不像信息那么重要。
我以这种方式导入了Apache Beam和Pandas(!只是向Google Colab给予shell命令的方式):

!{'pip install --quiet apache-beam pandas'}

这是我实现Apache Bean管道的Python代码:

import apache_beam as beam
import pandas as pd

def parse_excel(line):
  # Use the pandas library to parse the line into a DataFrame
  df = pd.read_excel(line)
  print("DATAFRAME")

  # Convert the DataFrame to a list of dictionaries, where each dictionary represents a row in the DataFrame
  # and has keys that are the column names and values that are the cell values
  return [row.to_dict() for _, row in df.iterrows()]

def print_json(json_object):
  # Print the JSON object
  print(json_object)

def run(argv=None):
  print("START run()")
  p = beam.Pipeline()

  # Read the Excel file as a PCollection
  lines = (
             p 
             | 'Read the Excel file' >> beam.io.ReadFromText('Pazienti_export_reduced.xlsx')
             | "Convert to pandas DataFrame" >> beam.Map(lambda x: pd.DataFrame(x))
             | "Write to CSV" >> beam.io.WriteToText(
                'data/csvOutput', file_name_suffix=".csv", header=True
            )
          )
  
  print("after lines pipeline")

  # Parse the lines using the pandas library
  #json_objects = lines | 'ParseExcel' >> beam.Map(parse_excel)

  # Print the values of the json_objects PCollection
  #json_objects | 'PrintJSON' >> beam.ParDo(print_json)
  

if __name__ == '__main__':
  print("START main()")
  print(beam.__version__)
  print(pd.__version__)
  run()

当我运行它时,我没有得到任何错误,但是我的data文件夹仍然是空的。基本上看起来预期的csvOutput.csv输出文件没有在我的管道末端创建。
出了什么问题?我错过了什么?我如何尝试修复我的代码?

5kgi1eie

5kgi1eie1#

您正在定义您的管道,但没有运行它。您需要执行以下任一操作

with beam.Pipeline(...) as p:
  ...
  # p.run() called on __exit__

p = beam.Pipeline(...)
...
p.run().wait_until_finish()

注意,beam.io.ReadFromText不适用于xlsx文件,同样,WriteToText也不接受Pandas Dataframe 的PCollection作为输入(如果Python是强类型的,就更清楚了)。

with beam.Pipeline(...) as p:
  filenames = p | beam.Create(['Pazienti_export_reduced.xlsx', ...])
  rows_as_dicts = filenames | beam.Map(filenames, parse_excel)
  csv_lines = csv_lines | beam.Map(
      lambda row: ','.join(str(row[c]) for c in [COL_NAMES]))
  csv_lines | beam.WriteToText('out.csv', header=...)

使用光束dataframes API更容易

from apache_beam.dataframe.io import read_excel

with beam.Pipeline(...) as p:
  data = p | read_excel("/path/to/*.xlsx")
  data.to_csv("out.csv")

相关问题