我试图使我的服务重新阅读Kafka主题从头到尾就开始初始化内部数据结构。我正在使用confluent.net客户端。根据我的理解,以下代码应该订阅我的主题设置偏移到开头:
consumer.Assign(new TopicPartitionOffset(topic, Partition.Any, Offset.Beginning));
但由于某些原因,我没有收到主题中预先存在的消息,也没有收到新消息。我对assign()方法的理解有误吗?有没有一种方法可以使用subscribe()实现所需的结果,而不需要使用kafka cli硬重置偏移量?
这是一个完整的测试客户机,我的输出总是“没有消息…”,尽管topic有消息并且新消息正在到达。
static void Main(string[] args)
{
var config = new ConsumerConfig
{
BootstrapServers = "localhost:9092",
GroupId = "test-consumer",
AutoOffsetReset = AutoOffsetReset.Earliest,
};
var consumer = new ConsumerBuilder<Null, byte[]>(config).Build();
var topic = "test-topic";
consumer.Assign(new TopicPartitionOffset(topic, Partition.Any, Offset.Beginning));
while (true)
{
var result = consumer.Consume(TimeSpan.FromSeconds(5));
if (result == null)
Console.WriteLine("No messages...");
else
Console.WriteLine($"Offset: {result.Offset}");
}
}
2条答案
按热度按时间wbgh16ku1#
问题是我用partition.any调用了assign(),以下代码可以工作:
consumer.Assign(new TopicPartitionOffset(topic, new Partition(0), Offset.Beginning));
xmjla07d2#
为什么要使用
Assign
? 以下几点应该对您有用: