使用dask从Hive读取数据

vyswwuz2  于 2021-06-27  发布在  Hive
关注(0)|答案(1)|浏览(481)

我正在使用 as_pandas 实用程序来自 impala.util 读取数据 dataframe 从Hive获取的窗体。但是,使用pandas,我想我将无法处理大量的数据,而且速度也会变慢。我一直在读关于dask的文章,它为读取大数据文件提供了极好的功能。如何使用它有效地从配置单元获取数据。

def as_dask(cursor):
"""Return a DataFrame out of an impyla cursor.
This will pull the entire result set into memory.  For richer pandas- 
like functionality on distributed data sets, see the Ibis project.

Parameters
----------
cursor : `HiveServer2Cursor`
    The cursor object that has a result set waiting to be fetched.
Returns
-------
DataFrame
"""
    import pandas as pd
    import dask
    import dask.dataframe as dd

    names = [metadata[0] for metadata in cursor.description]
    dfs = dask.delayed(pd.DataFrame.from_records)(cursor.fetchall(), 
    columns=names)
    return dd.from_delayed(dfs).compute()
f4t66c6m

f4t66c6m1#

目前还没有直接的方法可以做到这一点。您最好看看dask.dataframe.read\u sql\u table的实现,以及在intake sql中的类似代码—您可能需要一种对数据进行分区的方法,并让每个工作人员通过调用 delayed() . dd.from_delayed 以及 dd.concat 然后可以用来缝合这些碎片。
-编辑-
你的功能有延迟的想法回到前面。在一个操作于单个游标的函数中,您正在延迟并立即实现数据—它不能并行化,如果数据太大(这就是您尝试此操作的原因),它将破坏您的内存。
假设您可以形成一组10个查询,其中每个查询获得不同的数据部分;不要使用偏移量,而是对配置单元索引的某个列使用条件。你想做一些类似的事情:

queries = [SQL_STATEMENT.format(i) for i in range(10)]
def query_to_df(query):
    cursor = impyla.execute(query)
    return pd.DataFrame.from_records(cursor.fetchall())

现在您有了一个返回分区的函数,它不依赖于全局对象—它只接受一个字符串作为输入。

parts = [dask.delayed(query_to_df)(q) for q in queries]
df = dd.from_delayed(parts)

相关问题