我想让Kafka和制片人兼任。
我想先启动consumer,然后将消息生成主题。消费者应该等待消息(直到某个截止日期,或者应该在后台无限期运行)。同时我可以开始制作。我用的是段库。我试图用goroutines实现它,但是失败了。消费者:
func (k *kafkaMessage) Init() {
k.msgs = make(chan kafka.Message, 300000)
}
type kafkaMessage struct{
msgs chan kafka.Message
messageCountTenant map[uint32]int64
}
func NewKafka() *kafkaMessage {
return &kafkaMessage{}
}
func(k *kafkaMessage) Run() {
var wg sync.WaitGroup
go func(reader *kafkaMessage) {
defer wg.Done()
for {
msg, more := <-reader.Msgs()
if !more {
fmt.Println(errors.New("reader didn't finish pending batches"))
}
if(err!=nil){
log.GetLogger().Println(err)
}
}
}(k)
wg.Wait()
log.GetLogger().Println(k.messageCountTenant)
}
func (k *kafkaMessage)ConsumeFromKafka(kafkaURL string,topic string, partition uint32)error{
config := kafka.ReaderConfig{
Brokers: []string{kafkaURL},
Topic: topic,
Partition: int(partition),
MinBytes: 10e3, // 10KB
MaxBytes: 10e6, // 10MB
}
var client *kafka.Reader
client = kafka.NewReader(config)
var cont context.Context
cont,_=context.WithCancel(context.Background())
var wg sync.WaitGroup
client.SetOffset(-2)
wg.Add(1)
go func() {
defer wg.Done()
for {
m, err:= client.ReadMessage(cont)
if err != nil {
log.Error(eventConfig.Name, "Error from reader:", err)
if cont.Err() != nil {
return
}
continue
}
k.msgs <- m
}
}()
//wg.Wait()
return nil
}
func (k *kafkaMsg) startConsumer(KafkaURL string,topic string,partitionlist []uint32){
for _, partition := range partitionList {
_= k.ConsumeFromKafka(kafkURL, topic,partition)
}
k.Run()
}
制作人:
func IngestionIntoKafka(count,topic, KafkaURL string){
kafkaWriter := getKafkaWriter(KafkaURL, topic)
for count > 0 {
event :=getMessages()//this function is returning messages
err = SendDataToKafka(event, kafkaWriter)
defer kafkaWriter.Close()
count--
}
}
主要内容:
func main(){
k:=NewKafka()
k.Init()
k.startConsumers(KafkaURL,topic,partitionList)
IngestionIntoKafka(1000,topic,kafkaURL)
}
暂无答案!
目前还没有任何答案,快来回答吧!