python redis和celery 太多客户端,每次执行时都有不同的错误|任务使用pymsql连接到mysql

ipakzgxi  于 2021-06-19  发布在  Mysql
关注(0)|答案(2)|浏览(380)

我目前正在开发一个应用程序,它必须处理几个长时间运行的任务。我正在使用 python 3 , flask , celery , redis .
我在localhost上有一个可行的解决方案,但是在heroku上有很多错误,每次执行应用程序都会触发一组不同的错误。我知道这不可能是随机的,所以我想知道从哪里开始寻找。
我觉得redis肯定出了问题,我正在努力了解客户是什么,他们来自哪里,但我找不到关于这个主题的官方文档或解释。
问题:
如果redis服务器启动了(甚至在localhost上),很多客户端都连接了,尽管我什么都没做。在heroku上(我使用的是heroku redis),我总是有6个客户机,在localhost上有11个客户机。
我做了一些研究,我可以用以下方式展示它们:

if 'DYNO' in os.environ:
    redis_db = redis.StrictRedis(host='HOST', port=15249, password='REDISDBPW')
else:
    redis_db = redis.StrictRedis()

# see what keys are in Redis

all_keys = redis_db.keys()
print (all_keys)

all_clients = redis_db.client_list()
print (all_clients)

我看到所有这些客户,但那里的信息对我一点帮助都没有。它们是什么?他们为什么在那里?他们从哪里来?
所有heroku redis插件都有一个客户端限制,所以我需要理解并优化它。首先我想 clientsnumber == tasknumber ,但不是这样。
我总共定义了12个任务,但是我现在测试了2个任务(两个任务都在30秒内完成)。
当我在localhost上执行任务时,客户机从11个增加到16个。如果我从16岁到18岁再执行一次,然后他们总是保持在18岁,不管我执行任务的频率有多高。
这是怎么回事?我有两个任务,为什么客户从11增加到16,然后从16增加到18?为什么任务完成后不关闭?
我现在正在为整个问题奋斗几天(尽管它在localhost上总是完美的工作),所以欢迎任何帮助或想法。我需要开始寻找一些地方,所以目前我正在努力了解客户。
编辑:
我安装了flower并尝试在localhost上监视这两个任务,一切看起来都很好。它处理两个任务,都在几秒钟内成功。返回值是正确的(但在localhost上总是很有效)。
但问题是,在我开始花客户数量跳到30。我还是不知道:什么是客户?根据我生成的客户端数量,我需要一个100美元的附加组件来处理两个任务,这需要几秒钟才能完成,这不可能是真的,我仍然认为redis有问题,即使是在localhost上。
我的redis设置非常简单:

if 'DYNO' in os.environ:
    app.config['CELERY_BROKER_URL'] = 'redis://[the full URL from the redis add-on]'
    app.config['CELERY_RESULT_BACKEND'] = 'redis://[the full URL from the redis add-on]'
else:
    app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
    app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost'

celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'], backend=app.config['CELERY_RESULT_BACKEND'])

下面是一个任务示例:

@celery.task(bind=True)
def get_users_deregistrations_task(self, g_start_date, g_end_date):

    start_date = datetime.strptime(g_start_date, '%d-%m-%Y')
    end_date = datetime.strptime(g_end_date, '%d-%m-%Y')

    a1 = db_session.query(func.sum(UsersTransactionsVK.amount)).filter(UsersTransactionsVK.date_added >= start_date, UsersTransactionsVK.date_added <= end_date, UsersTransactionsVK.payed == 'Yes').scalar()
    a2 = db_session.query(func.sum(UsersTransactionsStripe.amount)).filter(UsersTransactionsStripe.date_added >= start_date, UsersTransactionsStripe.date_added <= end_date, UsersTransactionsStripe.payed == 'Yes').scalar()
    a3 = db_session.query(func.sum(UsersTransactions.amount)).filter(UsersTransactions.date_added >= start_date, UsersTransactions.date_added <= end_date, UsersTransactions.on_hold == 'No').scalar()

    if a1 is None:
        a1 = 0

    if a2 is None:
        a2 = 0

    if a3 is None:
        a3 = 0

    amount = a1 + a2 + a3

    return {'some_value' : amount}

# Selects user deregistrations between selected dates

@app.route('/get-users-deregistration', methods=["POST"])
@basic_auth.required
@check_verified
def get_users_deregistrations():
    if request.method == "POST":

        # init task
        task = get_users_deregistrations_task.apply_async([session['g_start_date'], session['g_end_date']])
        return json.dumps({}), 202, {'Location': url_for('taskstatus_get_users_deregistrations', task_id=task.id)}

@app.route('/status/<task_id>')
def taskstatus_get_users_deregistrations(task_id):
    task = get_users_deregistrations_task.AsyncResult(task_id)
    if task.state == 'PENDING':
        response = {
            'state': task.state,
            'current': 0,
            'total': 1,
            'status': 'Pending...'
        }
    elif task.state != 'FAILURE':
        response = {
            'state': task.state,
            'current': task.info['current'],
            'total': task.info['total'],
            'status': 'Finished',
            'statistic': task.info['statistic'],
            'final_dataset': task.info     
        }
        if 'result' in task.info:
            response['result'] = task.info['result']
    else:
        print ('in else')
        # something went wrong in the background job
        response = {
            'state': task.state,
            'current': 1,
            'total': 1,
            'status': str(task.info),  # this is the exception raised
        }
    return json.dumps(response)

编辑:
这是我给heroku的程序文件:

web: gunicorn stats_main:app
worker: celery worker -A stats_main.celery --loglevel=info

编辑
我想问题可能是连接池(在redis端),我没有正确使用它。
我还找到了celery 的一些配置,并添加了它们:

celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'], backend=app.config['CELERY_RESULT_BACKEND'], redis_max_connections=20, BROKER_TRANSPORT_OPTIONS = {
    'max_connections': 20,
}, broker_pool_limit=None)

我用这些配置再次将所有内容上传到heroku。我仍然只测试了两个任务,这两个任务都很快。
我已经连续10次在heroku上执行任务,7次成功。3次看起来他们完成得太早:返回的结果是错误的(正确的结果是f.e.30000,返回3次18000)。
客户机很快跳到了20,但从来没有超过20,所以至少最大客户机错误和redis连接丢失错误得到了解决。
现在的大问题是任务可能会过早完成,返回的结果是否正确非常重要,性能一点也不重要。
编辑
没关系,什么都解决不了,一切都是随机的。我加了两个 print() 在其中一个任务中进一步调试并上传到heroku。在两次执行之后,我再次看到到redis的连接丢失,达到了客户端的最大数量(尽管我的redismonitor插件显示客户端从未超过20个)
编辑
大量客户端可能是由空闲客户端造成的,这些客户端由于某种原因从未关闭(在heroku的博客文章中发现):
默认情况下,redis永远不会关闭空闲连接,这意味着如果您不显式关闭redis连接,您将把自己锁定在示例之外。
为了确保不会发生这种情况,heroku redis将默认连接超时设置为300秒。此超时不适用于非发布/订阅客户端和其他阻止操作。
我现在为空闲客户机添加了一个kill函数,正好在我的每个任务之前:

def kill_idle_clients():
    if 'DYNO' in os.environ:
        redis_db = redis.StrictRedis(host='HOST', port=15249, password='REDISDBPW')
    else:
        redis_db = redis.StrictRedis()

    all_clients = redis_db.client_list()
    counter = 0
    for client in all_clients:
        if int(client['idle']) >= 15:
            redis_db.client_kill(client['addr'])
            counter += 1

    print ('killing idle clients:', counter)

在任务开始之前,它会关闭所有空闲时间超过15秒的客户端。它在localhost上再次工作(但毫不奇怪,它总是在localhost上工作)。我的客户少了,但在heroku上,它现在只起到了2/10的作用。8次任务又太早完成。也许那些闲着的客户不是真的闲着,我不知道。
它也几乎不可能测试,因为每个任务的执行都有不同的结果(与redis失去连接、达到客户端限制、过早完成、工作完美)。
编辑
似乎celery 的设置一直被忽视。我一直对此持怀疑态度,并决定通过添加一些随机参数和将值更改为“无意义”来测试它。我重新启动了celery 工厂。
我本以为会看到一些错误,但它却像什么都没发生一样工作。
在这些无感配置下,一切都像以前一样工作:

celery = Celery(app.name, broker=app.config['REDIS_URL'], backend=app.config['REDIS_URL'], redis_max_connections='pups', BROKER_TRANSPORT_OPTIONS = {
    'max_connections': 20,
}, broker_pool_limit=None, broker_connection_timeout='pups', pups="pups")
celery.conf.broker_transport_options = {'visibility_timeout': 'pups'}

编辑
我改变了为celery 加载配置的方式(从一个单独的配置文件)。似乎现在起作用了,但问题还是一样。

celery_task = Celery(broker=app.config['REDIS_URL'], backend=app.config['REDIS_URL'])
celery_task.config_from_object('celeryconfig')

编辑
通过这些配置,我成功地将localhost上所有任务的客户端数量限制为18个(我尝试了所有12个任务)。然而在heroku上它“不知怎么”起作用了。客户少了,但一次达到20个,虽然我以为不能超过18个(我在heroku上测试了4个任务)。
在heroku上测试所有12个任务会触发许多不同的sql错误。我现在比以前更糊涂了。似乎同一个任务执行了多次,但我只看到12个任务url。
我认为因为sql错误是f.e.:

sqlalchemy.exc.InternalError: (pymysql.err.InternalError) Packet sequence number wrong - got 117 expected 1

或者

sqlalchemy.exc.InterfaceError: (pymysql.err.InterfaceError) (0, '')

Multiple rows were found for one()

我用4个任务在heroku上测试了几次,有几次返回了任务结果,但结果非常奇怪。
这一次任务没有过早完成,但返回的值增加了,看起来任务a返回了2次值并求和。
示例:任务a必须返回10k,但返回20k,因此该任务已执行两次,结果已求和。
这是我目前的配置。我仍然不完全理解数学,但我认为它(对于客户数量而言):

max-conncurency * CELERYD_MAX_TASKS_PER_CHILD

在localhost上,我发现了一个新的cli命令来检查worker stats,并且 max-conncurecy=3 以及 CELERYD_MAX_TASKS_PER_CHILD=6 cli命令:

celery -A stats_main.celery_task inspect stats

我当前的配置:
辅助进程启动:

celery worker -A stats_main.celery_task --loglevel=info --autoscale=10,3

配置:

CELERY_REDIS_MAX_CONNECTIONS=20
BROKER_POOL_LIMIT=None
CELERYD_WORKER_LOST_WAIT=20
CELERYD_MAX_TASKS_PER_CHILD=6
BROKER_TRANSPORT_OPTIONS = {'visibility_timeout': 18000} # 5 hours
CELERY_RESULT_DB_SHORT_LIVED_SESSIONS = True #useful if: For example, intermittent errors like (OperationalError) (2006, ‘MySQL server has gone away’)

编辑
看到所有这些sql错误,我决定研究一个完全不同的方向。我的新理论是 MySQL 问题。
我调整了与mysql服务器的连接,如这个问题的答案所述。
我还发现pymsql threadsafety=1 ,我还不知道这是否是一个问题,但似乎mysql与连接和连接池有关。
目前,我还可以说内存不是问题,因为如果包太大,它不应该在localhost上工作,这意味着我离开了 max_allowed_packet 默认值为4mb左右。
我还创建了3个虚拟任务,它们在不连接外部mysql数据库的情况下进行一些简单的计算。我现在已经在heroku上执行了5次,没有错误,结果总是正确的,所以我假设问题不是celery ,redis,而是mysql,尽管我不知道为什么它会在localhost上工作。也许这三者的结合导致了heroku的问题。
编辑
我调整了我的js文件。现在每个任务都被一个接一个地调用,这意味着它们不是异步的(我仍然使用celery 的) apply_async 因为 apply (不起作用)
所以这是一个艰难的解决办法。我只是创造了一个 var 对于每个任务,f.e。 var task_1_rdy = false; 我还创建了一个函数,它每2秒运行一次,检查一个任务是否就绪,如果就绪,它将启动下一个任务。我想很容易理解我在这里做了什么。
在heroku上进行了测试,没有任何错误,即使有多个任务,所以问题可能已经解决了。我需要做更多的测试,但看起来很有希望。ofc公司。我没有使用异步功能,一个接一个地运行任务可能会有最差的性能,但它现在可以工作了。我将在周一对性能差异进行基准测试并更新问题。
编辑
我今天做了很多测试。直到任务完成所需的时间是相同的(同步与异步),我不知道为什么,但它是相同的。
在heroku上处理所有12个任务并选择一个大时间范围(大时间范围=任务需要更长时间,因为要处理的数据更多):
同样,任务结果不精确,返回值错误,只是有点错误,但错误,因此不可靠,即任务a必须返回20k,在heroku上返回19500。我不知道怎么可能数据丢失/任务过早返回,但两周后我会放弃,并尝试使用完全不同的系统。

vwkv1x7d

vwkv1x7d1#

听起来像是使用celery 工人redis作为消息队列来创建restapi。以下是chk列表:
1在客户端中,逻辑完成后是否关闭了连接
2.celery 会给新工人带来麻烦,工人们可能会惹麻烦,试着用celery 监控花
3确保你的客户完成任务,试着调试打印一些东西,有时登台和本地有网络问题,这是阻止你结束celery 任务
4如果您正在使用redis处理celery 味精队列,请尝试监视队列的数量,也许它们会自动放大?

azpvetkf

azpvetkf2#

现在我60%确定是您的任务占用了太长时间,服务器无法在默认的web请求返回时间内响应。70%/30%的方法适用于网络速度非常快的本地机器。在云平台上,延迟是个问题,有时它会影响你的程序。在此之前,如果 celery worker 失败了w

相关问题