使用RabbitMQ作为程序,Celery作为消费者

x8diyxa7  于 2022-11-08  发布在  RabbitMQ
关注(0)|答案(1)|浏览(184)

我尝试使用RabbitMQ、Celery和Flask应用程序来简单地更新数据库。ProcedureAPI.py是一个API,它获取数据,在数据库中插入记录,并将数据推送到Radbbitmq服务器。Celery从Rabbit Queue获取数据并更新数据库。我是新手,请指出我做错了什么。
consumer.py

from celery import Celery
import sqlite3
import time

# app = Celery('Task_Queue')

# default_config = 'celeryconfig'

# app.config_from_object(default_config)

app = Celery('tasks', backend='rpc://', broker='pyamqp://guest:guest@localhost')

@app.task(serializer='json')
def updateDB(x):
    x=x["item"]
    with sqlite3.connect("test.db") as conn:
        time.sleep(5)
        conn.execute('''updateQuery''', [x])
        # app.log(f"{x['item']} status is updated as completed!")
    return x

ProcedureAPI.py

from flask import Flask,request,jsonify
import pandas as pd
import sqlite3
import json
import pika
import configparser

parser = configparser.RawConfigParser()   
configFilePath = 'appconfig.conf'
parser.read(configFilePath)

# RabbitMQ Config

rmq_username = parser.get('general', 'rmq_USERNAME')
rmq_password = parser.get('general', 'rmq_PASSWORD')
host= parser.get('general', 'rmq_IP')
port= parser.get('general', 'rmq_PORT')

# Database

DATABASE= parser.get('general', 'DATABASE_FILE')

app = Flask(__name__)
conn_credentials = pika.PlainCredentials(rmq_username, rmq_password)
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host=host,
        port=port,
        credentials=conn_credentials))
channel = connection.channel()

@app.route('/create', methods=['POST'])
def create_main():
   if request.method=="POST":
       print(DATABASE)
       with sqlite3.connect(DATABASE) as conn:
           conn.execute('''CREATE TABLE table1
                  (feild1 INTEGER PRIMARY KEY,              ##AUTOINCREMENT
                  feild2           varchar(20)    NOT NULL,
                  feild3         varchar(20)    DEFAULT 'pending');''')
           return "Table created",202

@app.route('/getData', methods=['GET'])
def display_main():
   if request.method=="GET":
       with sqlite3.connect(DATABASE) as conn:
           df = pd.read_sql_query("SELECT * from table1", conn)
           df_list = df.values.tolist()
           JSONP_data = jsonify(df_list)
           return JSONP_data,200

@app.route('/', methods=['POST'])
def update_main():
    if request.method=="POST":
       updatedata=request.get_json()
       with sqlite3.connect(DATABASE) as conn:
           conn.execute("INSERT_Query")
           print("Records Inserted successfully")
           channel.queue_declare(queue='celery', durable=True)
           channel.basic_publish(exchange = 'celery',routing_key ='celery' ,body = json.dumps(updatedata),properties=pika.BasicProperties(delivery_mode = 2))
           return updatedata,202

# main driver function

if __name__ == '__main__':
    app.run()

配置文件

[general]

# RabbitMQ server (broker) IP address

rmq_IP=127.0.0.1

# RabbitMQ server (broker) TCP port number (5672 or 5671 for SSL)

rmq_PORT=5672

# queue name (storage node hostname)

rmq_QUEUENAME=Task_Queue

# RabbitMQ authentication

rmq_USERNAME=guest
rmq_PASSWORD=guest

DATABASE_FILE=test.db

# log file

receiver_LOG_FILE=cmdmq_receiver.log
sender_LOG_FILE=cmdmq_sender.log

嫩celery

celery -A consumer worker --pool=solo -l info

我得到的错误:

(env1) PS C:\Users\USER\Desktop\Desktop\Jobs Search\nodepython\flaskapp> celery -A consumer worker --pool=solo -l info

 -------------- celery@DESKTOP-FRBNH77 v5.2.0 (dawn-chorus)
---*****-----
--*******---- Windows-10-10.0.19041-SP0 2021-11-12 17:35:04
-***--- * ---
-**---------- [config]
-**---------- .> app:         tasks:0x1ec10c9c5c8
-**---------- .> transport:   amqp://guest:**@localhost:5672//
-**---------- .> results:     rpc://
-***--- * --- .> concurrency: 12 (solo)
--*******---- .> task events: OFF (enable -E to monitor tasks in this worker)
---*****-----
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery

[tasks]
  . consumer.updateDB

[2021-11-12 17:35:04,546: INFO/MainProcess] Connected to amqp://guest:**@127.0.0.1:5672//      
[2021-11-12 17:35:04,571: INFO/MainProcess] mingle: searching for neighbors
[2021-11-12 17:35:05,594: INFO/MainProcess] mingle: all alone
[2021-11-12 17:35:05,605: INFO/MainProcess] celery@DESKTOP-FRBNH77 ready.
[2021-11-12 17:35:14,952: WARNING/MainProcess] Received and deleted unknown message.  Wrong destination?!?

The full contents of the message body was: body: '{"item": "1BOOK"}' (17b)
{content_type:None content_encoding:None
  delivery_info:{'consumer_tag': 'None4', 'delivery_tag': 1, 'redelivered': False, 'exchange': 
'celery', 'routing_key': 'celery'} headers={}}

任何参考代码或建议都会有很大的帮助。

kxxlusnw

kxxlusnw1#

似乎尚未声明交换并绑定到要路由的队列

channel.exchange_declare(exchange='exchange_name', exchange_type="type_of_exchange")

channel.queue_bind(exchange='exchange_name, queue='your queue_name')
  • 制作人:P
  • 兑换:E
  • 队列:Q
  • 绑定:B

生成器(您的pika脚本)无法直接发送消息。生成器需要一些中间体来发送到队列,因此消息路由来自
P〉〉E〉〉B〉〉Q
根据交换类型,交换将请求路由到一个或多个队列
Bind(顾名思义),用于根据交换类型将交换绑定到Queue
详情请参阅::
https://hevodata.com/learn/rabbitmq-exchange-type/

相关问题