如何同时经营Kafka消费者和生产者

r7knjye2  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(261)

我想让Kafka和制片人兼任。
我想先启动consumer,然后将消息生成主题。消费者应该等待消息(直到某个截止日期,或者应该在后台无限期运行)。同时我可以开始制作。我用的是段库。我试图用goroutines实现它,但是失败了。消费者:

func (k *kafkaMessage) Init() {
       k.msgs = make(chan kafka.Message, 300000)
        }
        type kafkaMessage struct{
        msgs   chan kafka.Message
        messageCountTenant map[uint32]int64
        }
        func NewKafka() *kafkaMessage {
        return &kafkaMessage{}
        }
        func(k *kafkaMessage) Run() {
        var wg sync.WaitGroup
        go func(reader *kafkaMessage) {
            defer wg.Done()
            for {
                msg, more := <-reader.Msgs()
                if !more {
                    fmt.Println(errors.New("reader didn't finish pending batches"))
                }
                if(err!=nil){
                    log.GetLogger().Println(err)
                }
            }
        }(k)
        wg.Wait()
        log.GetLogger().Println(k.messageCountTenant)
       }
         func (k *kafkaMessage)ConsumeFromKafka(kafkaURL string,topic string, partition uint32)error{
        config := kafka.ReaderConfig{
            Brokers:   []string{kafkaURL},
            Topic:     topic,
            Partition: int(partition),
            MinBytes:  10e3, // 10KB
            MaxBytes:  10e6, // 10MB
        }
        var client *kafka.Reader
        client = kafka.NewReader(config)
        var cont context.Context
        cont,_=context.WithCancel(context.Background())
        var wg sync.WaitGroup
        client.SetOffset(-2)
        wg.Add(1)
        go func() {
            defer wg.Done()
            for {
                m, err:= client.ReadMessage(cont)
                if err != nil {
                    log.Error(eventConfig.Name, "Error from reader:", err)
                    if cont.Err() != nil {
                        return
                    }
                    continue
                }
                k.msgs <- m
            }
        }()
        //wg.Wait()
        return nil
        }
         func (k *kafkaMsg) startConsumer(KafkaURL string,topic string,partitionlist []uint32){
         for _, partition := range partitionList {
                _= k.ConsumeFromKafka(kafkURL, topic,partition)
            }
            k.Run()
       }

制作人:

func IngestionIntoKafka(count,topic, KafkaURL string){
          kafkaWriter := getKafkaWriter(KafkaURL, topic)
            for count > 0 {
            event :=getMessages()//this function is returning messages
            err = SendDataToKafka(event, kafkaWriter)
            defer kafkaWriter.Close()
            count--
        }
    }

主要内容:

func main(){
       k:=NewKafka()
       k.Init()
        k.startConsumers(KafkaURL,topic,partitionList)
        IngestionIntoKafka(1000,topic,kafkaURL)
       }

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题