根据将在Python中并行运行的函数创建进程

ruyhziif  于 2023-01-12  发布在  Python
关注(0)|答案(1)|浏览(128)

我有一个执行SELECTsql查询的函数(使用postgresql)。现在,我想将这个查询的执行时间插入到我的数据库中的某个表中,但是,我想并行地执行它,这样即使我的INSERT查询仍然在运行,我也能够继续我的程序并调用其他函数。
我尝试使用multiprocessing.process,但是,我的函数正在等待进程结束,实际上我失去了我想要的并行性的效果。
我的代码在一个坚果壳:

def select_func():
    with connection.cursor() as cursor:
        query = "SELECT * FROM myTable WHERE \"UserName\" = 'Alice'"
        start = time.time()
        cursor.execute(query)
        end = time.time()
        process = Process(target = insert_func, args = (query, (end-start)))
        process.start()
        process.join()
        return cursor.fetchall()
        
def insert_func(query, time):
    with connection.cursor() as cursor:
        query = "INSERT INTO infoTable (\"query\", \"exec_time\")
                VALUES (\"" + query  + "\", \"" + time + "\")"
        cursor.execute(query)
        connection.commit()

现在的问题是这个操作不是真正异步的,因为select_func一直在等待insert_function的执行,我希望这些函数的执行不会被依赖,并且select函数可以在insert_function还在运行时结束,这样我就可以继续并调用脚本中的其他函数。
谢谢!

jtw3ybtb

jtw3ybtb1#

你的代码片段有很多问题,但是让我们试着至少给予一个要实现的结构。

def select_func():
    with connection.cursor() as cursor: #I dont think the same global variable connectino should be used for read/write simultaneously
        query = "SELECT * FROM myTable WHERE \"UserName\" = 'Alice'" #quotation issues
        start = time.time()
        cursor.execute(query)
        end = time.time()
        process = Process(target = insert_func, args = (query, (end-start)))
        process.start() #you start the process here BUT
        process.join() #you force python to wait for it here....
        return cursor.fetchall()
        
def insert_func(query, time):
    with connection.cursor() as cursor:
        query = "INSERT INTO infoTable (\"query\", \"exec_time\")
                VALUES (\"" + query  + "\", \"" + time + "\")"
        cursor.execute(query)
        connection.commit()

考虑一个替代方案:

def select_func():
    read_con = sql.connect() #sqlite syntax but use your connection
    with read_con.cursor() as cursor:
        query = "SELECT * FROM myTable WHERE \"UserName\" = 'Alice'" #where does Alice come from? 
        start = time.time()
        cursor.execute(query)
        end = time.time()
        return cursor.fetchall(),(query,(end-start)) #Our tuple has query at position 0 and time at position 1

def insert_function(insert_queue): #The insert you want to parallleize
    
    connection = sql.connect("db") #initialize your 'writer'. Note: May be good to initialize the connection on each insert. Not sure if optimal. 
    while True: #We keep pulling from the pipe
        data = insert_queue.get() # we pull from our pipe
        if data == 'STOP': #Example of a kill instruction to stop our process
            break #breaks the while loop and the function can 'exit'
         
        with connection.cursor() as cursor:
            query_data = data #I assume you would want to pass your query through the pipe
            query= query_data[0] #see how we stored the tuple
            time = query_data[1] #as above
            insert_query = "INSERT INTO infoTable (\"query\", \"exec_time\")
                VALUES (\"" + query  + "\", \"" + time + "\")" #Somehow query and time goes into the insert_query
            cursor.execute(insert_query)
        connection.commit()
            
        
if __name__ == '__main__': #Typical python main thread

    query_pipe = Queue() #we initialize a Queue here to feed into your inserting function
    process = Process(target = insert_func,args = (query_pipe,)
    process.start() 
    stuff = []
    for i in range(5):
        data,insert_query = select_function() #select function lets say it gets the data you want to insert. 
        stuff.append(data)
        query_pipe.put(insert_query)
    #
    #Do other stuff and even put more stuff into the pipe.
    #
    query_pipe.put('STOP') #we wanna kill our process so we send the stop command
    process.join()

相关问题