我已经设置了一个连接器,从rabbitmq队列中拉入一个kafka主题。连接器运行,队列清空。但是当我用kafka控制台消费者或kafkacat查看主题时,每个条目看起来都像一个字节数组- [B@xx
.
rabbitmq消息有效负载都是json。我需要做些什么才能从Kafka那里得到json?我试过了 value.converter=org.apache.kafka.connect.storage.StringConverter
以及对控制台使用者使用bytearraydeserializer。
connect-standalone.properties:独立连接:
bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
plugin.path=/home/robbie/kafka/plugins
rabbitmqsourceconnector.properties:属性:
name=rabbitmq
tasks.max=1
connector.class=io.confluent.connect.rabbitmq.RabbitMQSourceConnector
rabbitmq.prefetch.count=500
rabbitmq.automatic.recovery.enabled=false
rabbitmq.network.recovery.interval.ms=10000
rabbitmq.topology.recovery.enabled=true
rabbitmq.queue=test1
rabbitmq.username=testuser1
rabbitmq.password=xxxxxxxxxxxxxxx
rabbitmq.host=rmqhost
rabbitmq.port=5672
kafka.topic=rabbitmq.test1
1条答案
按热度按时间az31mfrm1#
你需要设置
我今天就写了一篇博客:)