如何使用impala和python代码并发运行查询?

xnifntxz  于 2021-05-29  发布在  Hadoop
关注(0)|答案(0)|浏览(342)

上下文

我使用python(3.7)在hadoop服务器上运行几个查询。
经过几次测试,我认为impala是查询数据库最有效的引擎。因此,我使用ibis框架设置了一个连接,以强制使用impala(默认情况下使用hive)。
考虑到查询的数量,我尝试同时运行这些查询。
我想我已经接近了,但是当我尝试使用ibis共享到服务器的连接时,遇到了一个问题,我启动了多个进程。
我对python还很陌生,但我将尽我所能清楚地解释我的问题,并使用正确的词汇表。请事先原谅我的任何错误。。。!

如何提交查询

对于提交我的查询,代码如下所示:
与数据库的连接:
hdfs=ibis.hdfs\u connect(host='x.x.x.x',port=y)client=ibis.impala.connect(host='x.x.x',port=y,hdfs\u client=hdfs)
创建查询(多次执行):
query=“选择…”。。。来自。。。在哪里……”
发送查询并检索结果(针对每个查询完成):
query=self.client.sql(query)data=query.execute(limit=none)

为了同时运行这些查询,我们做了什么

目前,我已经使用多处理创建了一个process类,并将启用连接的client参数(至少,我认为)和一个包含配置要在服务器上运行的查询所需信息的列表传递给它:

import multiprocessing

class ParallelDataRetrieving(multiprocessing.Process):

    """Process in charge of retrieving the data."""

    def __init__(self,client,someInformations):

    multiprocessing.Process.__init__(self)

    self.client = client
    self.someInformations = someInformations

def run(self):

    """Code to run during the execution of the process."""

    cpt1 = 0

    while cpt1 < len(someInformations):

        query = Use someInformations[cpt1] to create the query.

    query = self.client.sql(query)
    data = query.execute(limit = None)

    Some work on the data...

    return 0

然后,我(尝试)从主脚本建立连接,并使用此连接启动几个进程:

hdfs = ibis.hdfs_connect(host='X.X.X.X', port=Y)
client = ibis.impala.connect(host='X.X.X.X',port=Y,hdfs_client=hdfs)

process_1 = ParallelDataRetrieving(client,someInformations)
process_1.start()
process_2 = ...

但是这个代码不起作用。我得到错误“typeerror:cannotpickle\u thread.lock objects”。
据我所知,这是因为多处理使用pickle来“封装”参数,并将其传输到进程(其内存在windows上单独运行)。而且似乎不可能pickle“client”参数。
然后,我在网上找到了几个想法,试图解决这个问题,但没有一个似乎适用于我的具体情况(朱鹮, Impala …):
我试图在进程对象的run方法中直接创建连接(这意味着每个进程有一个连接):这会导致“brokenpipeerror:[errno 32]broken pipe”
我尝试使用multiprocessing.sharedTypes.rawvalue,但是如果这是正确的解决方案,我不太确定我是否在代码中正确地实现了它。。。
这就是我目前的情况。我将继续尝试解决这个问题,但作为python的一种“新人”,以及数据库查询的多处理,我认为一个更高级的用户可能会帮助我!
提前感谢您花时间处理此请求!

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题