我在docker上安装了confluent kafka。在这个主题中,我有10个分区。问题是我不能使用来自该主题的消息,但我可以在该主题中生成消息。我正在尝试使用c#confluent.kafka driver 1.5.1(最新版本)和librd.kafka 1.5.0(最新版本)来使用本主题中的内容。
我开始使用的docker compose文件如下
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper
hostname: zookeeper
networks:
- bridge_network
ports:
- "3001:3001"
environment:
ZOOKEEPER_CLIENT_PORT: 3001
ZOOKEEPER_TICK_TIME: 3000
broker:
image: confluentinc/cp-kafka
hostname: broker
depends_on:
- zookeeper
ports:
- "3002:3002"
networks:
- bridge_network
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:3001'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker:3002'
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
kafka_manager:
image: sheepkiller/kafka-manager
hostname: kafka_manager
depends_on:
- zookeeper
ports:
- '9000:9000'
networks:
- bridge_network
environment:
ZK_HOSTS: 'zookeeper:3001'
networks:
bridge_network:
driver: bridge
driver_opts:
com.docker.network.enable_ipv6: "false"
我在c#中的使用者配置如下:
var consumer = new ConsumerBuilder<string, string>(new Dictionary<string, string>
{
{ "bootstrap.servers", "PLAINTEXT://localhost:3002" },
{ "group.id", "some-test-group" },
{ "auto.offset.reset", "latest"},
{ "compression.codec", "gzip" },
{ "enable.auto.commit", "false" }
}).Build();
consumer.Subscribe("some-test-topic");
while (true)
{
var cr = consumer.Consume(30_000);
if (cr == null || cr.Message.Key == null || cr.Message.Value == null)
{
System.Console.WriteLine("that's it");
break;
}
System.Console.WriteLine(cr.Message.Key + ": " + cr.Message.Value);
}
我确信主题的分区中有消息,因为我可以使用kafka工具2.0检查主题
我用于kafka工具的配置是
我很确定我在配置文件中漏掉了一些东西,但是在阅读了两天的文档后,我仍然找不到这个问题。有人能帮忙吗?
2条答案
按热度按时间sirbozc51#
问题在于代理和主题复制因素。我使用docker compose文件部署kafka,我连接查看日志并显示以下消息:
为了解决这个问题,我必须为代理配置添加“kafka\u offsets\u topic\u replication\u factor:1”。因此,我的代理服务配置如下所示:
重新启动代理后,我能够生成/使用消息。
chhqkbe12#
你需要设置
auto.offset.reset
在用户运行时“最早”或生成主题消息。