为每个客户端创建新的Kafka消费者是否合适?

h5qlskok  于 2023-05-16  发布在  Apache
关注(0)|答案(1)|浏览(129)

我正在运行一个WebSocket服务器来实现实时数据流,并使用Kafka进行流处理。为每个客户端创建新的Kafka消费者是一个好的实践吗?到目前为止,我正在为每个客户端创建一个新的Kafka消费者,但如果我添加了超过5000个客户端,我会得到这个错误:
%2| 1683792545.811|螺纹|rdkafka#consumer-13806| [thrd:main]:无法创建代理线程
有谁知道怎么解决这个问题吗?

func(c *Client) SendDataTOConsumer() {  
    defer func() {
        if r := recover(); r != nil {
            log.Println("PANIC:", r)
        }
    }()
    
    run := true
    config := &kafka.ConfigMap{
        "bootstrap.servers": "localhost:9092", // broker addr
        "group.id": c.Id, // consumer group
        "auto.offset.reset": "earliest", // earliest offset
    }

    // Create Kafka consumer
    consumer, err := kafka.NewConsumer(config)
    if err != nil {
        log.Println(err)
    }

    defer func ()  {
        consumer.Close()
    }()

    var topic []string
    for key := range c.SubsTopics {
        topic = append(topic, key)
    }

    if err := consumer.SubscribeTopics(topic, nil); err != nil{
        log.Print("Error Subscribing topics: " ,err)
    }

    // Process messages
    for run {
        select {
        case unSubscribe := <- c.UnsubChan:
            run = false
            if unSubscribe {
                if err := consumer.Unsubscribe(); err != nil{
                    log.Println("Error while unsubscribing",err)
                }
            }
        
        default:
            message, err := consumer.ReadMessage(100 * time.Millisecond)
            if err != nil {
                // Errors are informational and automatically handled by the consumer
                continue
            }

            if err := c.Conn.WriteMessage(websocket.TextMessage, message.Value); err != nil {
                //close the connection, consumer and stop the for loop if some error occurs while writing the message
                if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure, websocket.CloseNormalClosure){
                    break
                }
                run = false
            }
        }
    }
}

这就是我如何为每个客户端创建新的消费者。
谁能给我推荐一个在Go中使用Kafka consumer来消费消息的好方法。

llmtgqce

llmtgqce1#

您可以创建一个监听所有主题的kafka消费者,而不是为每个客户端创建kafka消费者。对于来自主题的每条消息,找出应该向哪些客户端发送此消息并以这种方式进行处理。

相关问题