python Azure数据湖的Pyarrow切片下推

vsmadaxz  于 2023-03-21  发布在  Python
关注(0)|答案(2)|浏览(178)

我想访问Azure数据湖上的Parquet文件,并且只检索一些行。
下面是一个可复制的示例,使用公共数据集:

import pyarrow.dataset as ds
from adlfs import AzureBlobFileSystem

abfs_public = AzureBlobFileSystem(
    account_name="azureopendatastorage")

dataset_public = ds.dataset('az://nyctlc/yellow/puYear=2010/puMonth=1/part-00000-tid-8898858832658823408-a1de80bd-eed3-4d11-b9d4-fa74bfbd47bc-426339-18.c000.snappy.parquet', filesystem=abfs_public)

与收集整个数据集相比,收集5行的处理时间是相同的。有没有一种方法可以使用Pyarrow实现切片下推?
以下是我的测试:

dataset_public.to_table()
# 5min 30s

dataset_public.head(5)
# 5min 11s

dataset_public.scanner().head(5)
# 5min 43s

我不确定.head().scanner().head()是否有区别
相关页面:

ukqbszuj

ukqbszuj1#

经过一些调整,我想我得到了你想要的东西。首先,让我们看看你发布的原始代码:

import pyarrow.dataset as ds
from adlfs import AzureBlobFileSystem

abfs_public = AzureBlobFileSystem(
    account_name="azureopendatastorage")

dataset_public = ds.dataset('az://nyctlc/yellow/puYear=2010/puMonth=1/part-00000-tid-8898858832658823408-a1de80bd-eed3-4d11-b9d4-fa74bfbd47bc-426339-18.c000.snappy.parquet', filesystem=abfs_public)

查看您提供的路径,您将其指向单个文件而不是整个数据集。添加一些调整:

import pyarrow.dataset as ds
from adlfs import AzureBlobFileSystem

abfs_public = AzureBlobFileSystem(account_name="azureopendatastorage")
dataset_public = ds.dataset('nyctlc/yellow/', filesystem=abfs_public, partitioning='hive')

现在,使用dataset_public.head(5)我得到:

因为我没有给予它排序顺序,所以它只从碰巧是第一个片段的文件中获取前5行(很可能)。
在你的原始代码示例中,你给出的路径使用了puYear=2010puMonth=1,所以我们可以使用它们。因为我们告诉它使用hive分区,所以我们可以确认它选择了数据集是在这些值上分区的:

print(dataset_public.partitioning.schema)
# prints:
# puYear: int32
# puMonth: int32
# -- schema metadata --
# org.apache.spark.sql.parquet.row.metadata: '{"type":"struct","fields":[{"' + 1456

如果我们使用这些字段作为过滤器来获取前5行:

所以这花了1分31秒。但我们可以做得更好!

1.12秒
看,默认的batch_size非常大,我现在忘记它是什么了。但是如果你只想抓取少量的行,你可以调整batch_size和fragment readahead的大小等,以更好地适应你的用例。
如果你查看head()方法的基本API文档,它有一个**kwargs,上面写着“see scanner()method for full parameter description”。如果你转到scanner()方法,它会指向这里:https://arrow.apache.org/docs/python/generated/pyarrow.dataset.Scanner.html#pyarrow.dataset.Scanner.from_dataset,你可以在这里看到方法的所有可用参数。就像只获取列的子集一样(因为Parquet而非常高效):

我希望这有助于您更好地理解如何利用数据集API和粗糙边缘/技巧来提高性能。

jhdbpxl9

jhdbpxl92#

我花了超过5分钟6:57分钟才用slice pushdown加载公共数据集,请参阅以下内容:-

import pyarrow.dataset as ds

from adlfs import AzureBlobFileSystem

  

abfs_public = AzureBlobFileSystem(

account_name="azureopendatastorage")

  

dataset_public = ds.dataset('az://nyctlc/yellow/puYear=2010/puMonth=1/part-00000-tid-8898858832658823408-a1de80bd-eed3-4d11-b9d4-fa74bfbd47bc-426339-18.c000.snappy.parquet', filesystem=abfs_public)

  

scanner = dataset_public.scanner()

table = scanner.to_table()

subset_table = table.slice(0, 5)

print(subset_table)

输出:-

看起来收集5行数据与收集整个数据集花费的时间相同。因为pyarrow.dataset模块不包括slice pushdown方法,所以在筛选任何行之前,首先将整个数据集加载到内存中。
作为一种解决方法,您可以使用处理结果更快的Pyspark参考如下:-
代码:-

# Azure storage access info
blob_account_name = "azureopendatastorage"
blob_container_name = "nyctlc"
blob_relative_path = "yellow"
blob_sas_token = "r"

# Allow SPARK to read from Blob remotely
wasbs_path = 'wasbs://%s@%s.blob.core.windows.net/%s' % (blob_container_name, blob_account_name, blob_relative_path)
spark.conf.set(
  'fs.azure.sas.%s.%s.blob.core.windows.net' % (blob_container_name, blob_account_name),
  blob_sas_token)
print('Remote blob path: ' + wasbs_path)

# SPARK read parquet, note that it won't load any data yet by now
df = spark.read.parquet(wasbs_path)
print('Register the DataFrame as a SQL temporary view: source')
df.createOrReplaceTempView('source')

# Display top 10 rows
print('Displaying top 10 rows: ')
display(spark.sql('SELECT * FROM source LIMIT 10'))

输出:-

参考:-

azure-docs/dataset-taxi-yellow.md at main · MicrosoftDocs/azure-docs · GitHub

相关问题