我必须从azure数据湖gen2的databricks中读取数百个avro文件,从每个文件中的body字段提取数据,并将所有提取的数据连接到一个唯一的Dataframe中。关键是要读取的所有avro文件都存储在湖中的不同子目录中,遵循以下模式:
根目录/yyyy/mm/dd/hh/mm/ss.avro
这迫使我循环数据的摄取和选择。我正在使用这个python代码,其中list\u avro\u files是指向所有文件的路径列表:
list_data = []
for file_avro in list_avro_files:
df = spark.read.format('avro').load(file_avro)
data1 = spark.read.json(df.select(df.Body.cast('string')).rdd.map(lambda x: x[0]))
list_data.append(data1)
data = reduce(DataFrame.unionAll, list_data)
有什么办法能更有效地做到这一点吗?如何并行化/加速此过程?
1条答案
按热度按时间l3zydbqr1#
只要你
list_avro_files
可以通过标准通配符语法来表示,您可能可以使用spark自己的能力来并行化读取操作。你只需要指定一个basepath
以及avro文件的文件名模式:而且,如果您发现需要确切地知道任何给定行来自哪个文件,请使用
input_file_name()
用于丰富Dataframe的内置函数: