我正在尝试改编来自消费者群体示例的代码 github.com/Shopify/sarama
,我正在努力添加一个单元测试来测试 session.MarkMessage()
在 ConsumeClaim
方法(https://github.com/shopify/sarama/blob/5466b37850a38f4ed6d04b94c6f058bd75032c2a/examples/consumergroup/main.go#l160).
这是我的代码和一个 consume()
功能:
package main
import (
"context"
"fmt"
"log"
"os"
"os/signal"
"sync"
"syscall"
"github.com/Shopify/sarama"
)
var (
addrs = []string{"localhost:9092"}
topic = "my-topic"
)
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var wg sync.WaitGroup
defer wg.Wait()
consumer := &Consumer{ready: make(chan bool)}
close := consume(ctx, &wg, consumer)
defer close()
<-consumer.ready
log.Println("Sarama consumer up and running!")
sigterm := make(chan os.Signal, 1)
signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
select {
case <-ctx.Done():
log.Println("terminating: context cancelled")
case <-sigterm:
log.Println("terminating: via signal")
}
}
func consume(ctx context.Context, wg *sync.WaitGroup, consumer *Consumer) (close func()) {
config := sarama.NewConfig()
config.Version = sarama.V0_11_0_2 // The version has to be at least V0_10_2_0 to support consumer groups
config.Consumer.Offsets.Initial = sarama.OffsetOldest
consumerGroup, err := sarama.NewConsumerGroup(addrs, "my-group", config)
if err != nil {
log.Fatalf("NewConsumerGroup: %v", err)
}
wg.Add(1)
go func() {
defer wg.Done()
for {
if err := consumerGroup.Consume(ctx, []string{topic}, consumer); err != nil {
log.Panicf("Consume: %v", err)
}
if ctx.Err() != nil {
return
}
consumer.ready = make(chan bool)
}
}()
close = func() {
if err := consumerGroup.Close(); err != nil {
log.Panicf("Close: %v", err)
}
}
return
}
// Consumer represents a Sarama consumer group consumer
type Consumer struct {
ready chan bool
handle func([]byte) error
}
// Setup is run at the beginning of a new session, before ConsumeClaim
func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error {
// Mark the consumer as ready
close(consumer.ready)
return nil
}
// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
return nil
}
// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for message := range claim.Messages() {
log.Printf("Message claimed: value = %s, timestamp = %v, topic = %s", message.Value, message.Timestamp, message.Topic)
if consumer.handle != nil {
if err := consumer.handle(message.Value); err != nil {
return fmt.Errorf("handle message %s: %v", message.Value, err)
}
}
session.MarkMessage(message, "")
}
return nil
}
下面是我为它编写的几个单元测试:
package main
import (
"context"
"fmt"
"log"
"sync"
"testing"
"time"
"github.com/Shopify/sarama"
"github.com/stretchr/testify/require"
"gotest.tools/assert"
)
func TestConsume(t *testing.T) {
config := sarama.NewConfig()
config.Producer.Return.Successes = true
producer, err := sarama.NewSyncProducer(addrs, config)
require.NoError(t, err)
partition, offset, err := producer.SendMessage(&sarama.ProducerMessage{
Topic: topic,
Value: sarama.ByteEncoder([]byte("foobar")),
})
require.NoError(t, err)
t.Logf("Sent message to partition %d with offset %d", partition, offset)
ctx, cancel := context.WithCancel(context.Background())
var wg sync.WaitGroup
consumer := &Consumer{ready: make(chan bool)}
close := consume(ctx, &wg, consumer)
<-consumer.ready
log.Println("Sarama consumer up and running!")
time.Sleep(1 * time.Second)
cancel()
wg.Wait()
close()
}
func TestConsumeTwice(t *testing.T) {
config := sarama.NewConfig()
config.Producer.Return.Successes = true
producer, err := sarama.NewSyncProducer(addrs, config)
require.NoError(t, err)
data1, data2 := "foobar1", "foobar2"
for _, data := range []string{data1, data2} {
partition, offset, err := producer.SendMessage(&sarama.ProducerMessage{
Topic: topic,
Key: sarama.StringEncoder("foobar"),
Value: sarama.StringEncoder(data),
})
require.NoError(t, err)
t.Logf("Sent message to partition %d with offset %d", partition, offset)
}
ctx, cancel := context.WithCancel(context.Background())
var wg sync.WaitGroup
messageReceived := make(chan []byte)
consumer := &Consumer{
ready: make(chan bool),
handle: func(data []byte) error {
messageReceived <- data
fmt.Printf("Received message: %s\n", data)
return nil
},
}
close := consume(ctx, &wg, consumer)
<-consumer.ready
log.Println("Sarama consumer up and running!")
for i := 0; i < 2; i++ {
data := <-messageReceived
switch i {
case 0:
assert.Equal(t, data1, string(data))
case 1:
assert.Equal(t, data2, string(data))
}
}
cancel()
wg.Wait()
close()
}
测试可以在docker容器中运行kafka和zookeeper之后运行,例如 johnnypark/kafka-zookeeper
像这样:
docker run -p 2181:2181 -p 9092:9092 -e ADVERTISED_HOST=127.0.0.1 -e NUM_PARTITIONS=10 johnnypark/kafka-zookeeper
我正在挣扎的是:如果我把这句话注解掉
session.MarkMessage(message, "")
测试仍然通过。根据https://godoc.org/github.com/shopify/sarama#consumergroupsession, MarkMessage
将消息标记为已使用,但如何在单元测试中测试它?
暂无答案!
目前还没有任何答案,快来回答吧!