我正在运行一个WebSocket服务器来实现实时数据流,并使用Kafka进行流处理。为每个客户端创建新的Kafka消费者是一个好的实践吗?到目前为止,我正在为每个客户端创建一个新的Kafka消费者,但如果我添加了超过5000个客户端,我会得到这个错误:
%2| 1683792545.811|螺纹|rdkafka#consumer-13806| [thrd:main]:无法创建代理线程
有谁知道怎么解决这个问题吗?
func(c *Client) SendDataTOConsumer() {
defer func() {
if r := recover(); r != nil {
log.Println("PANIC:", r)
}
}()
run := true
config := &kafka.ConfigMap{
"bootstrap.servers": "localhost:9092", // broker addr
"group.id": c.Id, // consumer group
"auto.offset.reset": "earliest", // earliest offset
}
// Create Kafka consumer
consumer, err := kafka.NewConsumer(config)
if err != nil {
log.Println(err)
}
defer func () {
consumer.Close()
}()
var topic []string
for key := range c.SubsTopics {
topic = append(topic, key)
}
if err := consumer.SubscribeTopics(topic, nil); err != nil{
log.Print("Error Subscribing topics: " ,err)
}
// Process messages
for run {
select {
case unSubscribe := <- c.UnsubChan:
run = false
if unSubscribe {
if err := consumer.Unsubscribe(); err != nil{
log.Println("Error while unsubscribing",err)
}
}
default:
message, err := consumer.ReadMessage(100 * time.Millisecond)
if err != nil {
// Errors are informational and automatically handled by the consumer
continue
}
if err := c.Conn.WriteMessage(websocket.TextMessage, message.Value); err != nil {
//close the connection, consumer and stop the for loop if some error occurs while writing the message
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure, websocket.CloseNormalClosure){
break
}
run = false
}
}
}
}
这就是我如何为每个客户端创建新的消费者。
谁能给我推荐一个在Go中使用Kafka consumer来消费消息的好方法。
1条答案
按热度按时间llmtgqce1#
您可以创建一个监听所有主题的kafka消费者,而不是为每个客户端创建kafka消费者。对于来自主题的每条消息,找出应该向哪些客户端发送此消息并以这种方式进行处理。