我有一个Django应用程序正在使用Celery并使用RabbitMQ作为消息代理。我已经将scrapy项目从我抓取数据的地方分离出来,并希望将这些废弃的数据推送到rabbitMQ中,然后Django将通过Celery使用这个RabbitMQ消息。我需要帮助来使用从scrapy项目推送到rabbitMQ中的消息。
代码片段。
斗志旺盛
def process_item(self, item, spider):
publish_message(item)
return item
def publish_message(data):
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost', port=5672))
channel = connection.channel()
channel.basic_publish(exchange='topic', routing_key='scrapy', body='Hello From
scrapy!')
connection.close()
在django应用中,consumers.py
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', heartbeat=600,
blocked_connection_timeout=300))
channel = connection.channel()
def callback(ch, method, properties, body):
print(" data =============== ", data)
# I will call celery task here once code print the data to make sure its running. unfortunately its not running. :(
return
channel.basic_consume(queue='scrapy', on_message_callback=callback, auto_ack=True)
print("Started Consuming...")
channel.start_consuming()
connection.close()
celery.py
from kombu import Exchange, Queue
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'my_project.settings.development')
celery_app = Celery('my_project', broker='amqp://guest:guest@rabbit:5672', backend='rpc://0.0.0.0:5672')
celery_app.config_from_object(f'django.conf:settings', namespace='CELERY')
celery_app.autodiscover_tasks()
celery_app.conf.update(
worker_max_tasks_per_child=1,
broker_pool_limit=None
)
default_exchange = Exchange('default', type='topic')
scrapy_exchange = Exchange('scrapy', type='topic')
celery_app.conf.task_queues = (
Queue('scrapy', scrapy_exchange, routing_key='scrapy.#'),
)
1条答案
按热度按时间6l7fqoea1#
使用时未声明队列。请尝试以下操作:
出版者
消费者