我有一个python celery应用程序作为客户端,任务是抓取电影,连接到rabbitmq服务器。另一方面,nodeJS后端将负责接收HTTP请求并调用抓取任务(使用amqplib将任务排队)。
celery 应用程序文件如下所示:
from celery import Celery
app = Celery('scraper', broker='amqps://')
@app.task(serializer='json')
def scrapNewMovies(url, limit):
print("Recieved")
return True
nodejs应用程序文件如下所示:
const {delayScrapNewMovies} = require('./celery/tasks');
const express = require("express");
const app = express()
app.get("", async (req, res)=> {
try{
await delayScrapNewMovies()
return res.json({"Hello":"There"})
} catch( err ){
console.log(err)
}
})
app.listen(8000)
节点延迟任务函数:
const amqp = require('amqplib');
async function delayScrapNewMovies() {
const connection = await amqp.connect('amqps://');
const channel = await connection.createChannel();
const queueName = 'celery';
const headers = {
'task': 'app.celery.scrapNewMovies',
'id': '123',
}
const properties = {
'correlation_id':'123',
'content-type': 'application/json',
'content-encoding': 'utf-8',
}
const message = {
task: 'app.celery.scrapNewMovies',
args: [],
kwargs: {
"url": "https://example.com/movies",
"limit": 10
},
id: '123',
};
await channel.assertQueue(queueName);
channel.sendToQueue(queueName, Buffer.from(JSON.stringify(message)),{ headers:headers, properties:properties });
}
module.exports = {delayScrapNewMovies}
现在一切都运行良好,celery和node运行完美,但每当我点击调用delayScrapNewMovies
的请求时,我会从celery“worker logs”中得到以下错误:
Received invalid task message: Task keyword arguments must be a mapping
The message has been ignored and discarded.
Please ensure your message conforms to the task
message protocol as described here:
http://docs.celeryq.org/en/latest/internals/protocol.html
The full contents of the message body was:
b'{"task":"app.celery.scrapNewMovies","args":[],"kwargs":{"url":"https://example.com/movies","limit":10},"id":"123"}' (114b)
Traceback (most recent call last):
File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/site-packages/celery/worker/strategy.py", line 29, in hybrid_to_proto2
args, kwargs = body.get('args', ()), body.get('kwargs', {})
AttributeError: 'str' object has no attribute 'get'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/site-packages/celery/worker/consumer/consumer.py", line 596, in on_task_received
strategy(
File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/site-packages/celery/worker/strategy.py", line 141, in task_message_handler
body, headers, decoded, utc = hybrid_to_proto2(message,
File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/site-packages/celery/worker/strategy.py", line 34, in hybrid_to_proto2
raise InvalidTaskError(
celery.exceptions.InvalidTaskError: Task keyword arguments must be a mapping
显然,问题与发送到队列的消息的格式有关。我尝试以许多不同的格式发送它,我也尝试使用不同的celery 序列化程序,但仍然得到同样的错误。
1条答案
按热度按时间qojgxg4l1#
问题解决
正如预期的那样,消息格式不正确,需要在选项字段中指定内容类型/编码,并且还需要使用publish方法而不是sendQueue。
延迟函数应如下所示: