如何在databricks中迭代以读取存储在数据湖中不同子目录中的数百个文件?

qv7cva1a  于 2021-05-29  发布在  Spark
关注(0)|答案(1)|浏览(458)

我必须从azure数据湖gen2的databricks中读取数百个avro文件,从每个文件中的body字段提取数据,并将所有提取的数据连接到一个唯一的Dataframe中。关键是要读取的所有avro文件都存储在湖中的不同子目录中,遵循以下模式:
根目录/yyyy/mm/dd/hh/mm/ss.avro
这迫使我循环数据的摄取和选择。我正在使用这个python代码,其中list\u avro\u files是指向所有文件的路径列表:

list_data = []

for file_avro in list_avro_files:
  df = spark.read.format('avro').load(file_avro)
  data1 = spark.read.json(df.select(df.Body.cast('string')).rdd.map(lambda x: x[0]))
  list_data.append(data1)

data = reduce(DataFrame.unionAll, list_data)

有什么办法能更有效地做到这一点吗?如何并行化/加速此过程?

l3zydbqr

l3zydbqr1#

只要你 list_avro_files 可以通过标准通配符语法来表示,您可能可以使用spark自己的能力来并行化读取操作。你只需要指定一个 basepath 以及avro文件的文件名模式:

scala> var df = spark.read
                 .option("basepath","/user/hive/warehouse/root")
                 .format("avro")
                 .load("/user/hive/warehouse/root/*/*/*/*.avro")

而且,如果您发现需要确切地知道任何给定行来自哪个文件,请使用 input_file_name() 用于丰富Dataframe的内置函数:

scala> df = df.withColumn("source",input_file_name())

相关问题