我正在运行以下代码:
listof theu paths是一个包含指向avro文件的路径的列表。例如,
['folder_1/folder_2/0/2020/05/15/10/41/08.avro', 'folder_1/folder_2/0/2020/05/15/11/41/08.avro', 'folder_1/folder_2/0/2020/05/15/12/41/08.avro']
注意:上面的路径存储在azuredatalake存储中,下面的过程在databricks中执行
spark.conf.set("fs.azure.account.key.{0}.dfs.core.windows.net".format(storage_account_name), storage_account_key)
spark.conf.set("spark.sql.execution.arrow.enabled", "false")
begin_time = time.time()
for i in range(len(list_of_paths)):
try:
read_avro_data,avro_decoded=None,None
#Read paths from Azure Data Lake "abfss"
read_avro_data=spark.read.format("avro").load("abfss://{0}@{1}.dfs.core.windows.net/{2}".format(storage_container_name, storage_account_name, list_of_paths[i]))
except Exception as e:
custom_log(e)
架构
read_avro_data.printSchema()
root
|-- SequenceNumber: long (nullable = true)
|-- Offset: string (nullable = true)
|-- EnqueuedTimeUtc: string (nullable = true)
|-- SystemProperties: map (nullable = true)
| |-- key: string
| |-- value: struct (valueContainsNull = true)
| | |-- member0: long (nullable = true)
| | |-- member1: double (nullable = true)
| | |-- member2: string (nullable = true)
| | |-- member3: binary (nullable = true)
|-- Properties: map (nullable = true)
| |-- key: string
| |-- value: struct (valueContainsNull = true)
| | |-- member0: long (nullable = true)
| | |-- member1: double (nullable = true)
| | |-- member2: string (nullable = true)
| | |-- member3: binary (nullable = true)
|-- Body: binary (nullable = true)
# this is the content of the AVRO file.
行数和列数
print("ROWS: ", read_avro_data.count(), ", NUMBER OF COLUMNS: ", len(read_avro_data.columns))
ROWS: 2 , NUMBER OF COLUMNS: 6
我想要的不是每次迭代读取1个avro文件,而是每次迭代读取2行内容。相反,我想一次读取所有的avro文件。所以2x3=6行内容在我的最后一个sparkDataframe。
这对spark.read()可行吗?如下所示:
spark.read.format("avro").load("abfss://{0}@{1}.dfs.core.windows.net/folder_1/folder_2/0/2020/05/15/*")
[更新]很抱歉误会通配符(*)。这意味着所有avro文件都在同一个文件夹中。但是,我每个avro文件有一个文件夹。所以3个avro文件,3个文件夹。在这种情况下,通配符不起作用。下面回答的解决方案是使用带有路径名的list[]。
提前感谢您的帮助和建议。
1条答案
按热度按时间czq61nw11#
load(path=None, format=None, schema=None,**options)
此方法将接受单个路径或路径列表。例如,您可以直接传递如下所示的路径列表