elasticsearch 减少Celery任务记录

sh7euo9m  于 2022-11-02  发布在  ElasticSearch
关注(0)|答案(2)|浏览(130)

我们使用Celery来管理Python Django应用程序中的任务。
我们还使用Django structlog来处理Celery打印的日志消息。
我们通过Logstash将这些消息摄取到ElasticSearch,并注意到我们得到的绝大多数消息都是这样的2:
已接收任务:%s中的文件夹
任务%(name)s[%(id)s]已在%(runtime)ss中成功:%(返回值)s
Celery使用INFO日志记录级别转储它们。
是否有办法将这些消息更改为DEBUG级别?
这样,我们仍然可以使用-l INFO运行celery,并看到其他INFO消息,但看不到这些消息?

更新

对于第二条日志消息。
快速阅读以下内容后:

如果对Celery、Django和structlog没有太多了解,这样的东西能起作用吗?

LOGGING = {
...
'loggers': {
        'celery.app.trace': {
          'handlers': ['null'],
          'propagate': False,
         }
...
}
wmomyfyw

wmomyfyw1#

我只能忽略这两条信息:

初始化.py

import celery
from unittest import mock

app = celery.Celery()

from celery.app import trace
old_info = trace.info
trace.info = mock.Mock()

class MyTask(celery.Task):
    Strategy = '61242484.strategy:my_default'

@app.task(base=MyTask)
def test():
    print("123")

test.apply_async()

strategy.py

from unittest import mock
from celery.worker.strategy import default

def my_default(*args,**kwargs):
    kwargs['info'] = mock.Mock()
    return default(*args,**kwargs)
7cjasjjr

7cjasjjr2#

@Anton Pomieshchenko,谢谢你的帮助,这对我来说很有用。但是,我发现my_default方法有一个bug
有了这个:

def strategy(*args,**kwargs):
    kwargs['info'] = mock.Mock()

    from celery.utils.log import get_task_logger
    logger = get_task_logger(__name__)
    logger.info(f'test: {list(args)}')
    logger.info(f'test: {[type(arg) for arg in list(args)]}')

    return celery.worker.strategy.default(*args,**kwargs)

我明白了:

INFO 2022-10-16 00:19:40,673 celery_app 41 139675281467200 test: [<@task: add_test_iterator of napsedjango at 0x7f08ac73f610>, <@task: add_test_iterator of napsedjango at 0x7f08ac73f610>, <Celery napsedjango at 0x7f08ac73f610>, <Consumer: celery@06d4f16e55fb (running)>]
INFO 2022-10-16 00:19:40,674 celery_app 41 139675281467200 test: [<class 'napsedjango._users.tasks.add_test_iterator'>, <class 'napsedjango._users.tasks.add_test_iterator'>, <class 'celery.app.base.Celery'>, <class 'celery.worker.consumer.consumer.Consumer'>]
CRITICAL 2022-10-16 00:19:40,674 worker 41 139675281467200 Unrecoverable error: TypeError("default() got multiple values for argument 'info'")
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/celery/worker/worker.py", line 203, in start
    self.blueprint.start(self)
  File "/usr/local/lib/python3.10/site-packages/celery/bootsteps.py", line 116, in start
    step.start(parent)
  File "/usr/local/lib/python3.10/site-packages/celery/bootsteps.py", line 365, in start
    return self.obj.start()
  File "/usr/local/lib/python3.10/site-packages/celery/worker/consumer/consumer.py", line 332, in start
    blueprint.start(self)
  File "/usr/local/lib/python3.10/site-packages/celery/bootsteps.py", line 116, in start
    step.start(parent)
  File "/usr/local/lib/python3.10/site-packages/celery/worker/consumer/tasks.py", line 26, in start
    c.update_strategies()
  File "/usr/local/lib/python3.10/site-packages/celery/worker/consumer/consumer.py", line 561, in update_strategies
    self.strategies[name] = task.start_strategy(self.app, self)
  File "/usr/local/lib/python3.10/site-packages/celery/app/task.py", line 412, in start_strategy
    return instantiate(self.Strategy, self, app, consumer,**kwargs)
  File "/usr/local/lib/python3.10/site-packages/celery/utils/imports.py", line 49, in instantiate
    return symbol_by_name(name)(*args,**kwargs)
  File "/app/config/celery_app.py", line 44, in strategy
    return celery.worker.strategy.default(*args,**kwargs)
TypeError: default() got multiple values for argument 'info'

在第1行和第2行(logger),我们可以看到在args中有两次任务。
要解决此问题,请执行以下操作:

def strategy(*args,**kwargs):
    kwargs['info'] = mock.Mock()
    return celery.worker.strategy.default(*args[1:],**kwargs)

相关问题