版本:golang 1.10.2 kafka 4.4.1 docker 18.03.1
我正在尝试使用shopify的sarama包来测试我的kafka示例。我用docker compose来支持kafka/zookeeper,一切都很成功。
当我尝试用sarama创建producer客户机时,抛出了一个错误。
当我运行以下命令时
package main
import (
"fmt"
"log"
"os"
"os/signal"
"time"
"strconv"
"github.com/Shopify/sarama"
)
func main() {
// Setup configuration
config := sarama.NewConfig()
config.Producer.Return.Successes = true
config.Producer.Partitioner = sarama.NewRandomPartitioner
config.Producer.RequiredAcks = sarama.WaitForAll
brokers := []string{"localhost:29092"}
producer, err := sarama.NewAsyncProducer(brokers, config)
if err != nil {
// Should not reach here
panic(err)
}
defer func() {
if err := producer.Close(); err != nil {
// Should not reach here
panic(err)
}
}()
我明白了
【萨拉玛】2018/06/12 17:22:05初始化新客户
[sarama]2018/06/12 17:22:05客户端/元数据从代理获取所有主题的元数据localhost:29092
【萨拉马】2018/06/12 17:22:05联系经纪人localhost:29092 (未注册)
[sarama]2018/06/12 17:22:05客户端/元数据在获取元数据时从代理获取错误:eof
【萨拉马】2018/06/12 17:22:05与经纪人的封闭连接localhost:29092
{sarama]2018/06/12 17:22:05客户端/元数据没有可用的代理可以向其发送元数据请求
[萨拉马]2018/06/12 17:22:06结束客户恐慌:Kafka:客户已经没有可用的经纪人可与之交谈(您的集群是否可以访问?)
goroutine1[running]:main.main()/users/benwornom/go/src/github.com/acstech/doppler events/testprod/main.go:29+0x3ec退出状态2
sarama确实连续几次尝试创建producer客户机,但每次都失败了。
我对sarama的“newasyncproducer”方法的理解是,它调用“newclient”,无论您是创建生产者还是消费者,都会调用它。newclient试图从kafka代理收集元数据,但在我的情况下失败了。我知道它和Kafka经纪人有联系,但一旦联系起来,它就好像断了。任何建议都会有帮助。我的网络连接很强,我想不出任何东西会干扰服务器。据我所知,对于现有的主题,我只有一个代理和一个分区。我认为我不必手动将主题分配给代理。如果我的客户与经纪人有联系,为什么我不能为我的制作人建立持久的联系?
这是Kafka死前的日志文件。
__消费者补偿-5->矢量(1),连接-补偿-23->矢量(1),消费者补偿-43->矢量(1),消费者补偿-32->矢量(1),消费者补偿-21->矢量(1),消费者补偿-10->矢量(1),连接-补偿-20->矢量(1),消费者补偿-37->矢量(1),连接-补偿-9->矢量(1),连接-状态-4->矢量(1),消费者偏移-48->向量(1),\uu消费者偏移-40->向量(1),\uu消费者偏移-29->向量(1),\uu消费者偏移-18->向量(1),连接偏移-14->向量(1),\uu消费者偏移-7->向量(1),\uu消费者偏移-34->向量(1),\uu消费者偏移-45->向量(1),\uu消费者偏移-23->向量(1),connect-OFFSET-6->矢量(1),connect-status-1->矢量(1),connect-OFFSET-17->矢量(1),connect-OFFSET-0->矢量(1),connect-OFFSET-22->矢量(1),connect-OFFSET-26->矢量(1),connect-OFFSET-11->矢量(1),connect-OFFSET-15->矢量(1),connect-OFFSET-4->矢量(1),connect-42->矢量(1),消费者偏移量-9->向量(1),\u消费者偏移量-31->向量(1),\u消费者偏移量-20->向量(1),连接偏移量-3->向量(1),\u消费者偏移量-1->向量(1),\u消费者偏移量-12->向量(1),连接偏移量-8->向量(1),连接偏移量-19->向量(1),连接状态-3->向量(1),\u confluent.support.metrics-0->向量(1),消费者偏移-17->向量(1),\uu消费者偏移-28->向量(1),\uu消费者偏移-6->向量(1),\uu消费者偏移-39->向量(1),\uu消费者偏移-44->向量(1),连接偏移-16->向量(1),连接状态-0->向量(1),连接偏移-5->向量(1),连接偏移-21->向量(1),\uu消费者偏移-47->向量(1),用户偏移-36->矢量(1),\uu用户偏移-14->矢量(1),\uu用户偏移-25->矢量(1),\uu用户偏移-3->矢量(1),\uu用户偏移-30->矢量(1),\uu用户偏移-41->矢量(1),连接偏移-13->矢量(1),连接偏移-24->矢量(1),连接偏移-2->矢量(1),连接配置-0->矢量(1),_uuconsumer_offset-11->向量(1),u consumer_offset-22->向量(1),uu consumer_offset-33->向量(1),uu consumer_offset-0->向量(1),connect-offset-7->向量(1),connect-offset-18->向量(1))(kafka.controller.kafkacontroller)[36mkafka_1 |[0m[2018-06-12 20:24:47,461]调试[controller id=1]主题不在broker 1 map()的首选副本中(kafka.controller.kafkantroller)[36mkafka|u 1 |[0m[2018-06-12 20:24:47462]跟踪[controller id=1]broker 1的引线不平衡比率为0.0(kafka.controller.kafkantroller)
1条答案
按热度按时间gmxoilav1#
我遇到了同样的问题,我做了以下几点来解决我的问题:
检查你使用的Kafka版本。传递给时在配置中指定的默认Kafka版本
NewAsyncProducer
是V0_8_2_0
. 确保在配置中指定正确的Kafka版本:确保您传递的代理url是正确的。您应该传递代理url,而不是zookeeper url或其他url。默认的Kafka端口是
9092
所以url应该类似于BROKER_URL:9092
如果使用默认端口。