我是Apache Beam的新手,在执行这个简单的任务时遇到了以下问题:我试图创建一个新的**.csv文件开始从一个.xlsxExcel文件。要做到这一点,我使用Apache梁与Python 3语言和Pandas**库。我承认,所有的主题对我来说都很新。
我正在研究Google Colab,但我认为这并不像信息那么重要。
我以这种方式导入了Apache Beam和Pandas(!只是向Google Colab给予shell命令的方式):
!{'pip install --quiet apache-beam pandas'}
这是我实现Apache Bean管道的Python代码:
import apache_beam as beam
import pandas as pd
def parse_excel(line):
# Use the pandas library to parse the line into a DataFrame
df = pd.read_excel(line)
print("DATAFRAME")
# Convert the DataFrame to a list of dictionaries, where each dictionary represents a row in the DataFrame
# and has keys that are the column names and values that are the cell values
return [row.to_dict() for _, row in df.iterrows()]
def print_json(json_object):
# Print the JSON object
print(json_object)
def run(argv=None):
print("START run()")
p = beam.Pipeline()
# Read the Excel file as a PCollection
lines = (
p
| 'Read the Excel file' >> beam.io.ReadFromText('Pazienti_export_reduced.xlsx')
| "Convert to pandas DataFrame" >> beam.Map(lambda x: pd.DataFrame(x))
| "Write to CSV" >> beam.io.WriteToText(
'data/csvOutput', file_name_suffix=".csv", header=True
)
)
print("after lines pipeline")
# Parse the lines using the pandas library
#json_objects = lines | 'ParseExcel' >> beam.Map(parse_excel)
# Print the values of the json_objects PCollection
#json_objects | 'PrintJSON' >> beam.ParDo(print_json)
if __name__ == '__main__':
print("START main()")
print(beam.__version__)
print(pd.__version__)
run()
当我运行它时,我没有得到任何错误,但是我的data文件夹仍然是空的。基本上看起来预期的csvOutput.csv输出文件没有在我的管道末端创建。
出了什么问题?我错过了什么?我如何尝试修复我的代码?
1条答案
按热度按时间5kgi1eie1#
您正在定义您的管道,但没有运行它。您需要执行以下任一操作
或
注意,
beam.io.ReadFromText
不适用于xlsx文件,同样,WriteToText
也不接受Pandas Dataframe 的PCollection作为输入(如果Python是强类型的,就更清楚了)。使用光束dataframes API更容易