RabbitMQ与Django项目和Scrapy一起推送和共享消息

dxxyhpgq  于 2022-11-08  发布在  RabbitMQ
关注(0)|答案(1)|浏览(192)

我有一个Django应用程序正在使用Celery并使用RabbitMQ作为消息代理。我已经将scrapy项目从我抓取数据的地方分离出来,并希望将这些废弃的数据推送到rabbitMQ中,然后Django将通过Celery使用这个RabbitMQ消息。我需要帮助来使用从scrapy项目推送到rabbitMQ中的消息。
代码片段。

斗志旺盛

def process_item(self, item, spider):
    publish_message(item)    
    return item

def publish_message(data):
    import pika
    connection = pika.BlockingConnection(
        pika.ConnectionParameters(host='localhost', port=5672))
    channel = connection.channel()
    channel.basic_publish(exchange='topic', routing_key='scrapy', body='Hello From 
     scrapy!')
    connection.close()

在django应用中,consumers.py

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', heartbeat=600,
                                                               blocked_connection_timeout=300))
channel = connection.channel()

def callback(ch, method, properties, body):
    print(" data =============== ", data)
    # I will call celery task here once code print the data to make sure its running. unfortunately its not running. :( 
    return 

channel.basic_consume(queue='scrapy', on_message_callback=callback, auto_ack=True)
print("Started Consuming...")
channel.start_consuming()
connection.close()

celery.py

from kombu import Exchange, Queue

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'my_project.settings.development')

celery_app = Celery('my_project', broker='amqp://guest:guest@rabbit:5672', backend='rpc://0.0.0.0:5672')
celery_app.config_from_object(f'django.conf:settings', namespace='CELERY')
celery_app.autodiscover_tasks()

celery_app.conf.update(
    worker_max_tasks_per_child=1,
    broker_pool_limit=None
)

default_exchange = Exchange('default', type='topic')
scrapy_exchange = Exchange('scrapy', type='topic')

celery_app.conf.task_queues = (
Queue('scrapy', scrapy_exchange, routing_key='scrapy.#'),
)
6l7fqoea

6l7fqoea1#

使用时未声明队列。请尝试以下操作:

出版者

def process_item(self, item, spider):
publish_message(item)    
return item

def publish_message(data):
    import pika
    connection = pika.BlockingConnection(
        pika.ConnectionParameters(host='localhost', port=5672))
    channel = connection.channel()
    channel.exchange_declare(exchange='topic')
    channel.basic_publish(exchange='topic', routing_key='scrapy', body='Hello From 
     scrapy!')
    connection.close()

消费者

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', heartbeat=600,
                                                               blocked_connection_timeout=300))
channel = connection.channel()

def callback(ch, method, properties, body):
    print(" data =============== ", body)
    # I will call celery task here once code print the data to make sure its running. unfortunately its not running. :( 
    return 
channel.exchange_declare(exchange='topic')
channel.queue_declare(queue='scrapy')
channel.queue_bind(exchange='topic', queue='scrapy', routing_key='scrapy')
channel.basic_consume(queue='scrapy', on_message_callback=callback, auto_ack=True)
print("Started Consuming...")
channel.start_consuming()
connection.close()

相关问题