python MySQLDB查询超时

9njqaruj  于 2023-11-16  发布在  Mysql
关注(0)|答案(7)|浏览(120)

我试图在python MySQLDB中对查询强制执行时间限制。我有一种情况,我无法控制查询,但需要确保它们不会超过设置的时间限制。我尝试使用signal.SIGALRM中断调用execute,但这似乎不起作用。信号被发送,但直到execute调用完成后才被捕获。
我写了一个测试用例来证明这种行为:

#!/usr/local/bin/python2.6

import time
import signal

from somewhere import get_dbc

class Timeout(Exception):
    """ Time Exceded """

def _alarm_handler(*args):
    raise Timeout

dbc = get_dbc()

signal.signal(signal.SIGALRM, _alarm_handler)
signal.alarm(1)

try:
    print "START:  ", time.time()
    dbc.execute("SELECT SLEEP(10)")
except Timeout:
    print "TIMEOUT!", time.time()'

字符串
“SELECT SLEEP(10)”模拟了一个慢查询,但我确实看到了一个实际的慢查询的相同行为。
结果:

START:   1254440686.69
TIMEOUT! 1254440696.69


正如你所看到的,它休眠了10秒钟,然后我得到了一个异常。
问题:
1.为什么我在执行完成后才收到信号?
1.是否有其他可靠的方法来限制查询执行时间?

qlckcl4x

qlckcl4x1#

@nosklo的基于twisted的解决方案是优雅和可行的,但是如果你想避免对twisted的依赖,这个任务仍然是可行的,例如:

import multiprocessing

def query_with_timeout(dbc, timeout, query, *a, **k):
  conn1, conn2 = multiprocessing.Pipe(False)
  subproc = multiprocessing.Process(target=do_query,
                                    args=(dbc, query, conn2)+a, 
                                    kwargs=k)
  subproc.start()
  subproc.join(timeout)
  if conn1.poll():
    return conn1.recv()
  subproc.terminate()
  raise TimeoutError("Query %r ran for >%r" % (query, timeout))

def do_query(dbc, query, conn, *a, **k):
  cu = dbc.cursor()
  cu.execute(query, *a, **k)
  return cu.fetchall()

字符串

b1payxdu

b1payxdu2#

使用adbapi。它允许你异步地进行数据库调用。

from twisted.internet import reactor
from twisted.enterprise import adbapi

def bogusQuery():
    return dbpool.runQuery("SELECT SLEEP(10)")

def printResult(l):
    # function that would be called if it didn't time out
    for item in l:
        print item

def handle_timeout():
    # function that will be called when it timeout
    reactor.stop()

dbpool = adbapi.ConnectionPool("MySQLdb", user="me", password="myself", host="localhost", database="async")
bogusQuery().addCallback(printResult)
reactor.callLater(4, handle_timeout)
reactor.run()

字符串

fcipmucu

fcipmucu3#

我试过使用signal.SIGALRM中断execute调用,但似乎不起作用。信号被发送,但直到execute调用完成后才被捕获。
mysql库在内部处理中断的系统调用,因此在API调用完成之前,您不会看到SIGALRM的副作用(除非杀死当前线程或进程)
您可以尝试修补MySQL-Python并使用MYSQL_OPT_READ_TIMEOUT选项(在mysql 5.0.25中添加)

mpgws1up

mpgws1up4#

为什么我在执行完成后才收到信号?
查询通过C函数执行,该函数阻止Python VM执行,直到它返回。
是否有其他可靠的方法来限制查询执行时间?
这是(IMO)一个非常丑陋的解决方案,但它确实有效。您可以在单独的进程中运行查询(通过fork()multiprocessing module)。在主进程中运行警报计时器,当您收到它时,发送SIGINTSIGKILL到子进程。如果您使用multiprocessing,您可以使用Process.terminate()方法。

mbjcgjjk

mbjcgjjk5#

通用注解

我最近遇到了同样的问题,我必须满足几个条件:

  • 解决方案必须是线程安全
  • 从同一台机器到数据库的多个连接可能同时处于活动状态,删除确切的一个连接/查询
  • 应用程序包含到许多不同数据库的连接-每个DB主机的可移植处理程序

我们有以下类布局(* 不幸的是,我不能张贴真实的来源 *):

class AbstractModel: pass 
class FirstDatabaseModel(AbstractModel): pass # Connection to one DB host
class SecondDatabaseModel(AbstractModel): pass # Connection to one DB host

字符串
为每个模型创建了多个线程。

Python 3.2解决方案

在我们的应用程序中,* 一个模型=一个数据库 。所以我为每个模型创建了“ 服务连接 *”(这样我们就可以并行连接执行KILL)。因此,如果创建了一个FirstDatabaseModel示例,则创建了2个数据库连接;如果创建了5个示例,则只使用了6个连接:

class AbstractModel:
    _service_connection = None # Formal declaration

    def __init__(self):
        ''' Somehow load config and create connection
        '''
        self.config = # ...
        self.connection = MySQLFromConfig(self.config)
        self._init_service_connection()

        # Get connection ID (pseudocode)
        self.connection_id = self.connection.FetchOneCol('SELECT CONNECTION_ID()') 

    def _init_service_connection(self):
        ''' Initialize one singleton connection for model
        '''
        cls = type(self)
        if cls._service_connection is not None:
            return

        cls._service_connection = MySQLFromConfig(self.config)


现在我们需要一个杀手

def _kill_connection(self):
    # Add your own mysql data escaping
    sql = 'KILL CONNECTION {}'.format(self.connection_id)

    # Do your own connection check and renewal
    type(self)._service_connection.execute(sql)

  • 注意:connection.execute =创建游标,执行,关闭游标。*

使用threading.Lock使杀手级线程安全:

def _init_service_connection(self):
    ''' Initialize one singleton connection for model
    '''
    cls = type(self)
    if cls._service_connection is not None:
        return

    cls._service_connection = MySQLFromConfig(self.config)
    cls._service_connection_lock = threading.Lock()

def _kill_connection(self):
    # Add your own mysql data escaping
    sql = 'KILL CONNECTION {}'.format(self.connection_id)
    cls = type(self)

    # Do your own connection check and renewal
    try:
        cls._service_connection_lock.acquire()    
        cls._service_connection.execute(sql)
    finally:
        cls._service_connection_lock.release()


最后使用threading.Timer添加定时执行方法:

def timed_query(self, sql, timeout=5):
    kill_query_timer = threading.Timer(timeout, self._kill_connection)
    kill_query_timer.start()

    try:
        self.connection.long_query() 
    finally:
        kill_query_timer.cancel()

isr3a4wc

isr3a4wc6#

在mysql中,可以设置执行超时。

dbc.execute("SET max_execution_time=%s", (timeout_in_milliseconds,))
dbc.execute("SELECT SLEEP(10)")

字符串
如果timeout_in_milliseconds小于10秒,则SLEEP查询应缩短。

lxkprmvk

lxkprmvk7#

为什么我在执行完成后才收到信号?
等待网络I/O的进程处于不可中断状态(UNIX的东西,与Python或MySQL无关)。它在系统调用完成后获得信号(可能是EINTR错误代码,尽管我不确定)。
是否有其他可靠的方法来限制查询执行时间?
我认为这通常是由外部工具完成的,比如mkill,它监视MySQL长时间运行的查询并杀死它们。

相关问题