我想把Kafka和拉比特结合起来,
我可以将消息发送到rabbitmq,但消息没有被使用。通道号状态在rabbitmq控制台中显示为-0。
将用户更改为admin,但仍然是相同的问题。
通过confluent部署kafka(而不是单独部署zookeeper kafka等),还通过confluent hub安装rabbitmq连接器。
请帮忙。
[2019-03-26 06:39:19,151] ERROR Consumer io.confluent.connect.rabbitmq.ConnectConsumer@454774b5 (amq.ctag-Unaj3jmbQQctolAwNzU2SQ) method handleDelivery for channel AMQChannel(amqp://guest@0:0:0:0:0:0:0:1:5672/,1) threw an exception for channel AMQChannel(amqp://guest@0:0:0:0:0:0:0:1:5672/,1) (com.rabbitmq.client.impl.ForgivingExceptionHandler:124)
java.lang.NullPointerException
at io.confluent.connect.rabbitmq.MessageConverter.basicProperties(MessageConverter.java:127)
at io.confluent.connect.rabbitmq.SourceRecordBuilder.sourceRecord(SourceRecordBuilder.java:40)
at io.confluent.connect.rabbitmq.ConnectConsumer.handleDelivery(ConnectConsumer.java:69)
at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:149)
at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:100)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
1条答案
按热度按时间ecbunoof1#
我在评估rabbitmq-kafka连接器时遇到了完全相同的错误。
amqp消息由两部分组成。标头和消息内容。我只是提供消息内容,而不是标题。
必须使用元数据创建basicproperties对象。java文档可以在这里查看。
然后,当您使用
basicPublish
方法,使用前面创建的basicproperties对象作为参数之一。basicpublish的java文档可以在这里找到。
希望这有帮助