为什么camel kafka rabbitmq连接器将我的消息转换为不可读的格式?

9fkzdhlc  于 2021-06-04  发布在  Kafka
关注(0)|答案(2)|浏览(384)

我的目标是在rabbitmq交换队列和kafka主题之间设置一个连接器。
我按照此指南设置连接器:https://camel.apache.org/camel-kafka-connector/latest/try-it-out-locally.html. 我从以下源下载并安装了连接器:https://github.com/apache/camel-kafka-connector,为 camel-rabbitmq-kafka-connector . 我还指出 plugin.path 到我解压的文件夹 camel-rabbitmq-kafka-connector connect-standalone.properties中的jar。
我用于 CamelRabbitSourceConnector 具体如下:

name=CamelRabbitmqSourceConnector
connector.class=org.apache.camel.kafkaconnector.rabbitmq.CamelRabbitmqSourceConnector
tasks.max=1

# use the kafka converters that better suit your needs, these are just defaults:

key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter

# comma separated topics to send messages into

topics=mytopic

# mandatory properties (for a complete properties list see the connector documentation):

# The exchange name determines the exchange to which the produced messages will be sent to. In the case of consumers, the exchange name determines the exchange the queue will be bound to.

camel.source.path.exchangeName=myexchange
camel.source.endpoint.hostname=myhostname
camel.source.endpoint.addresses=localhost:5672
camel.source.endpoint.queue=myqueue

我对rabbitmq的docker run命令如下所示: docker run --rm -it --hostname myhostname -p 15672:15672 -p 5672:5672 --name rabbitmq rabbitmq:3-management . 对于Kafka,我使用了标准的“入门”指南。
使用python pika库发送消息:

import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='myqueue',durable=True,auto_delete=True)
channel.basic_publish(exchange='', routing_key='myqueue', body='some body...')

如你所见,我发送的消息没有具体说明 exchange 中的参数 channel.basic_publish 功能。如果我把它设为 camel.source.path.exchangeName ,然后我的信息就在两者之间的某个地方丢失了,所以我可能在这里遗漏了一些东西。

pnwntuvh

pnwntuvh1#

我可以使用以下属性使其工作:

name=CamelRabbitmqSourceConnector
connector.class=org.apache.camel.kafkaconnector.rabbitmq.CamelRabbitmqSourceConnector
tasks.max=1

# use the kafka converters that better suit your needs, these are just defaults:

key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter

# comma separated topics to send messages into

topics=mytopic

# mandatory properties (for a complete properties list see the connector documentation):

# The exchange name determines the exchange to which the produced messages will be sent to. In the case of consumers, the exchange name determines the exchange the queue will be bound to.

camel.source.endpoint.hostname=myhostname
camel.source.endpoint.addresses=localhost:5672
camel.source.endpoint.queue=myqueue
camel.source.endpoint.autoDelete=false
camel.source.endpoint.skipExchangeDeclare=true
camel.source.endpoint.skipQueueBind=true
rqcrx0a6

rqcrx0a62#

我通过将我的客户机更改为java解决了这个问题:https://www.rabbitmq.com/tutorials/tutorial-one-java.html 而不是python。

相关问题