我的目标是在rabbitmq交换队列和kafka主题之间设置一个连接器。
我按照此指南设置连接器:https://camel.apache.org/camel-kafka-connector/latest/try-it-out-locally.html. 我从以下源下载并安装了连接器:https://github.com/apache/camel-kafka-connector,为 camel-rabbitmq-kafka-connector
. 我还指出 plugin.path
到我解压的文件夹 camel-rabbitmq-kafka-connector
connect-standalone.properties中的jar。
我用于 CamelRabbitSourceConnector
具体如下:
name=CamelRabbitmqSourceConnector
connector.class=org.apache.camel.kafkaconnector.rabbitmq.CamelRabbitmqSourceConnector
tasks.max=1
# use the kafka converters that better suit your needs, these are just defaults:
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
# comma separated topics to send messages into
topics=mytopic
# mandatory properties (for a complete properties list see the connector documentation):
# The exchange name determines the exchange to which the produced messages will be sent to. In the case of consumers, the exchange name determines the exchange the queue will be bound to.
camel.source.path.exchangeName=myexchange
camel.source.endpoint.hostname=myhostname
camel.source.endpoint.addresses=localhost:5672
camel.source.endpoint.queue=myqueue
我对rabbitmq的docker run命令如下所示: docker run --rm -it --hostname myhostname -p 15672:15672 -p 5672:5672 --name rabbitmq rabbitmq:3-management
. 对于Kafka,我使用了标准的“入门”指南。
使用python pika库发送消息:
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='myqueue',durable=True,auto_delete=True)
channel.basic_publish(exchange='', routing_key='myqueue', body='some body...')
如你所见,我发送的消息没有具体说明 exchange
中的参数 channel.basic_publish
功能。如果我把它设为 camel.source.path.exchangeName
,然后我的信息就在两者之间的某个地方丢失了,所以我可能在这里遗漏了一些东西。
2条答案
按热度按时间pnwntuvh1#
我可以使用以下属性使其工作:
rqcrx0a62#
我通过将我的客户机更改为java解决了这个问题:https://www.rabbitmq.com/tutorials/tutorial-one-java.html 而不是python。