我尝试使用RabbitMQ、Celery和Flask应用程序来简单地更新数据库。ProcedureAPI.py是一个API,它获取数据,在数据库中插入记录,并将数据推送到Radbbitmq服务器。Celery从Rabbit Queue获取数据并更新数据库。我是新手,请指出我做错了什么。
consumer.py
from celery import Celery
import sqlite3
import time
# app = Celery('Task_Queue')
# default_config = 'celeryconfig'
# app.config_from_object(default_config)
app = Celery('tasks', backend='rpc://', broker='pyamqp://guest:guest@localhost')
@app.task(serializer='json')
def updateDB(x):
x=x["item"]
with sqlite3.connect("test.db") as conn:
time.sleep(5)
conn.execute('''updateQuery''', [x])
# app.log(f"{x['item']} status is updated as completed!")
return x
ProcedureAPI.py
from flask import Flask,request,jsonify
import pandas as pd
import sqlite3
import json
import pika
import configparser
parser = configparser.RawConfigParser()
configFilePath = 'appconfig.conf'
parser.read(configFilePath)
# RabbitMQ Config
rmq_username = parser.get('general', 'rmq_USERNAME')
rmq_password = parser.get('general', 'rmq_PASSWORD')
host= parser.get('general', 'rmq_IP')
port= parser.get('general', 'rmq_PORT')
# Database
DATABASE= parser.get('general', 'DATABASE_FILE')
app = Flask(__name__)
conn_credentials = pika.PlainCredentials(rmq_username, rmq_password)
connection = pika.BlockingConnection(pika.ConnectionParameters(
host=host,
port=port,
credentials=conn_credentials))
channel = connection.channel()
@app.route('/create', methods=['POST'])
def create_main():
if request.method=="POST":
print(DATABASE)
with sqlite3.connect(DATABASE) as conn:
conn.execute('''CREATE TABLE table1
(feild1 INTEGER PRIMARY KEY, ##AUTOINCREMENT
feild2 varchar(20) NOT NULL,
feild3 varchar(20) DEFAULT 'pending');''')
return "Table created",202
@app.route('/getData', methods=['GET'])
def display_main():
if request.method=="GET":
with sqlite3.connect(DATABASE) as conn:
df = pd.read_sql_query("SELECT * from table1", conn)
df_list = df.values.tolist()
JSONP_data = jsonify(df_list)
return JSONP_data,200
@app.route('/', methods=['POST'])
def update_main():
if request.method=="POST":
updatedata=request.get_json()
with sqlite3.connect(DATABASE) as conn:
conn.execute("INSERT_Query")
print("Records Inserted successfully")
channel.queue_declare(queue='celery', durable=True)
channel.basic_publish(exchange = 'celery',routing_key ='celery' ,body = json.dumps(updatedata),properties=pika.BasicProperties(delivery_mode = 2))
return updatedata,202
# main driver function
if __name__ == '__main__':
app.run()
配置文件
[general]
# RabbitMQ server (broker) IP address
rmq_IP=127.0.0.1
# RabbitMQ server (broker) TCP port number (5672 or 5671 for SSL)
rmq_PORT=5672
# queue name (storage node hostname)
rmq_QUEUENAME=Task_Queue
# RabbitMQ authentication
rmq_USERNAME=guest
rmq_PASSWORD=guest
DATABASE_FILE=test.db
# log file
receiver_LOG_FILE=cmdmq_receiver.log
sender_LOG_FILE=cmdmq_sender.log
嫩celery
celery -A consumer worker --pool=solo -l info
我得到的错误:
(env1) PS C:\Users\USER\Desktop\Desktop\Jobs Search\nodepython\flaskapp> celery -A consumer worker --pool=solo -l info
-------------- celery@DESKTOP-FRBNH77 v5.2.0 (dawn-chorus)
---*****-----
--*******---- Windows-10-10.0.19041-SP0 2021-11-12 17:35:04
-***--- * ---
-**---------- [config]
-**---------- .> app: tasks:0x1ec10c9c5c8
-**---------- .> transport: amqp://guest:**@localhost:5672//
-**---------- .> results: rpc://
-***--- * --- .> concurrency: 12 (solo)
--*******---- .> task events: OFF (enable -E to monitor tasks in this worker)
---*****-----
-------------- [queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. consumer.updateDB
[2021-11-12 17:35:04,546: INFO/MainProcess] Connected to amqp://guest:**@127.0.0.1:5672//
[2021-11-12 17:35:04,571: INFO/MainProcess] mingle: searching for neighbors
[2021-11-12 17:35:05,594: INFO/MainProcess] mingle: all alone
[2021-11-12 17:35:05,605: INFO/MainProcess] celery@DESKTOP-FRBNH77 ready.
[2021-11-12 17:35:14,952: WARNING/MainProcess] Received and deleted unknown message. Wrong destination?!?
The full contents of the message body was: body: '{"item": "1BOOK"}' (17b)
{content_type:None content_encoding:None
delivery_info:{'consumer_tag': 'None4', 'delivery_tag': 1, 'redelivered': False, 'exchange':
'celery', 'routing_key': 'celery'} headers={}}
任何参考代码或建议都会有很大的帮助。
1条答案
按热度按时间kxxlusnw1#
似乎尚未声明交换并绑定到要路由的队列
生成器(您的pika脚本)无法直接发送消息。生成器需要一些中间体来发送到队列,因此消息路由来自
P〉〉E〉〉B〉〉Q
根据交换类型,交换将请求路由到一个或多个队列
Bind(顾名思义),用于根据交换类型将交换绑定到Queue
详情请参阅::
https://hevodata.com/learn/rabbitmq-exchange-type/