我是刚来戈朗和卡法的新手,所以这个问题看起来很傻。
在我的kafka消费者第一次连接到kafka服务器之后,为什么在建立到kafka服务器的连接和接收第一条消息之间会有延迟(大约20秒)?
它会在 consumer.Messages()
并为收到的每条消息打印另一条消息。约20秒的延迟介于 fmt.Println
第二个呢 fmt.Println
.
package main
import (
"fmt"
"github.com/Shopify/sarama"
cluster "github.com/bsm/sarama-cluster"
)
func main() {
// Create the consumer and listen for new messages
consumer := createConsumer()
// Create a signal channel to know when we are done
done := make(chan bool)
// Start processing messages
go func() {
fmt.Println("Start consuming Kafka messages")
for msg := range consumer.Messages() {
s := string(msg.Value[:])
fmt.Println("Msg: ", s)
}
}()
<-done
}
func createConsumer() *cluster.Consumer {
// Define our configuration to the cluster
config := cluster.NewConfig()
config.Consumer.Return.Errors = false
config.Group.Return.Notifications = false
config.Consumer.Offsets.Initial = sarama.OffsetOldest
// Create the consumer
brokers := []string{"127.0.0.1:9092"}
topics := []string{"orders"}
consumer, err := cluster.NewConsumer(brokers, "my-consumer-group", topics, config)
if err != nil {
log.Fatal("Unable to connect consumer to Kafka")
}
go handleErrors(consumer)
go handleNotifications(consumer)
return consumer
}
docker-compose.yml公司
version: '2'
services:
zookeeper:
image: "confluentinc/cp-zookeeper:5.0.1"
hostname: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker-1:
image: "confluentinc/cp-enterprise-kafka:5.0.1"
hostname: broker-1
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_BROKER_RACK: rack-a
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://127.0.0.1:9092'
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_DELETE_TOPIC_ENABLE: "true"
KAFKA_JMX_PORT: 9999
KAFKA_JMX_HOSTNAME: 'broker-1'
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker-1:9092
CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
CONFLUENT_METRICS_ENABLE: 'true'
CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
KAFKA_CREATE_TOPICS: "orders:1:1"
1条答案
按热度按时间um6iljoc1#
在我的kafka消费者第一次连接到kafka服务器之后,为什么在建立到kafka服务器的连接和接收第一条消息之间会有延迟(大约20秒)?
不会有太多的延迟,因为消费者使用的是从Kafka接收消息的消息通道。一旦消息在kafka队列中可用,它将被发送到消费者可以接收的消息通道。
代码实现:-