我在用萨拉玛(https://github.com/shopify/sarama/)Kafka0.8.0为我的消费者。我的代码是这样的:
consumerLoop:
for {
select {
case event := <-consumer.Events():
if event.Err != nil {
break consumerLoop
panic(event.Err)
}
<-c.sem
go c.processJob(event.Value)
}
}
我使用缓冲通道(c.sem)来控制一次可以运行多少processjob goroutine。这就是我如何控制消费者的并发性/速度。
这种方法的问题是,如果需要更改并发性,就必须关闭使用者并重新启动它(通道缓冲区大小是命令行标志)。我记录处理的偏移量,并且我必须查看我的日志,以确定处理了哪些偏移量以及我希望消费者从何处恢复。我想要一个更免提的方法来管理这些偏移。
我已经在consumer.properties中将autocommit.enabled设置为true,但在zookeeper中没有看到任何更改。我认为这是因为当前的kafka协议不支持offset api:https://issues.apache.org/jira/browse/kafka-993
在处理完一个作业后,我可以尝试在zookeeper中手动存储偏移量,但我不知道在运行多个异步processjob时该如何工作。Kafka应该在这里存储偏移量:https://cwiki.apache.org/confluence/display/kafka/kafka+data+structures+in+zookeeper
这个值应该只有一个吗?如果这是真的,那就意味着我不能使用异步processjob,因为不同进程之间可能存在延迟,它们将覆盖彼此的值。使用者是否应该在单个线程中运行并一次处理单个事件?正确的方法是让更多的消费者加快速度,而不是走goroutines路线吗?
1条答案
按热度按时间1qczuiv01#
我想最简单的答案是不要使用信号量的通道。改为使用一个受锁保护的整数,然后可以调整最大可用goroutine而无需重新启动。
如果你真的想继续使用频道,你可以使用
ResizableChannel
从我的频道包:https://godoc.org/github.com/eapache/channels#resizablechannel