rabbitmq nodejs +Pythoncelery :无效任务错误:任务关键字参数必须是Map

j2cgzkjk  于 2023-03-12  发布在  RabbitMQ
关注(0)|答案(1)|浏览(202)

我有一个python celery应用程序作为客户端,任务是抓取电影,连接到rabbitmq服务器。另一方面,nodeJS后端将负责接收HTTP请求并调用抓取任务(使用amqplib将任务排队)。
celery 应用程序文件如下所示:

from celery import Celery
app = Celery('scraper', broker='amqps://')

@app.task(serializer='json')
def scrapNewMovies(url, limit):
    print("Recieved")
    return True

nodejs应用程序文件如下所示:

const {delayScrapNewMovies} = require('./celery/tasks');
const express = require("express");
const app = express()

app.get("", async (req, res)=> {

  try{
    await delayScrapNewMovies()

    return res.json({"Hello":"There"})
  } catch( err ){
    console.log(err)
  }
})

app.listen(8000)

节点延迟任务函数:

const amqp = require('amqplib');

async function delayScrapNewMovies() {

  const connection = await amqp.connect('amqps://');
  const channel = await connection.createChannel();
  const queueName = 'celery';
  const headers = {
    'task': 'app.celery.scrapNewMovies',
    'id': '123',
  }

  const properties = {
    'correlation_id':'123',
    'content-type': 'application/json',
    'content-encoding': 'utf-8',
  }
  const message = {
    task: 'app.celery.scrapNewMovies',
    args: [],
    kwargs: {
      "url": "https://example.com/movies",
      "limit": 10
    },
    id: '123',
  };


  await channel.assertQueue(queueName);
  channel.sendToQueue(queueName, Buffer.from(JSON.stringify(message)),{ headers:headers, properties:properties });

}
  
module.exports = {delayScrapNewMovies}

现在一切都运行良好,celery和node运行完美,但每当我点击调用delayScrapNewMovies的请求时,我会从celery“worker logs”中得到以下错误:

Received invalid task message: Task keyword arguments must be a mapping
The message has been ignored and discarded.

Please ensure your message conforms to the task
message protocol as described here:
http://docs.celeryq.org/en/latest/internals/protocol.html

The full contents of the message body was:
b'{"task":"app.celery.scrapNewMovies","args":[],"kwargs":{"url":"https://example.com/movies","limit":10},"id":"123"}' (114b)
Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/site-packages/celery/worker/strategy.py", line 29, in hybrid_to_proto2
    args, kwargs = body.get('args', ()), body.get('kwargs', {})
AttributeError: 'str' object has no attribute 'get'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/site-packages/celery/worker/consumer/consumer.py", line 596, in on_task_received
    strategy(
  File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/site-packages/celery/worker/strategy.py", line 141, in task_message_handler
    body, headers, decoded, utc = hybrid_to_proto2(message,
  File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/site-packages/celery/worker/strategy.py", line 34, in hybrid_to_proto2
    raise InvalidTaskError(
celery.exceptions.InvalidTaskError: Task keyword arguments must be a mapping

显然,问题与发送到队列的消息的格式有关。我尝试以许多不同的格式发送它,我也尝试使用不同的celery 序列化程序,但仍然得到同样的错误。

qojgxg4l

qojgxg4l1#

问题解决

正如预期的那样,消息格式不正确,需要在选项字段中指定内容类型/编码,并且还需要使用publish方法而不是sendQueue。
延迟函数应如下所示:

async function delayScrapNewMovies() {

  const connection = await amqp.connect('amqps://');
  const channel = await connection.createChannel();
  const queueName = 'celery';

  const headers = {
    'task': 'app.celery.scrapNewMovies',
    'id': '123',
    "lang": "py",
    "argsrepr": '',
    "kwargsrepr": "{}"
  }

  const body = {
    args: [],
    kwargs:{"url": "https://example.com/movies","limit": 10}
  }

  const options = {
    headers:headers, 
    contentType:'application/json', 
    contentEncoding:'utf-8',
    deliveryMode: 2

  }
  await channel.assertQueue(queueName);
  channel.publish("",queueName, Buffer.from(JSON.stringify(body)), options);

}

相关问题