我想要一个流(主总线),它获取所有信号(signals.>),一个消费者,它从东边获取所有信号(signals.east.>)。几个应用程序从那个消费者那里消费。每个应用程序可能有1-5个示例,但我只想要其中一个示例接收那个信号(没有重复和顺序问题)。
我有一个流,配置如下:
Configuration:
Subjects: signals.>
Acknowledgements: true
Retention: File - Interest
Replicas: 1
Discard Policy: Old
Duplicate Window: 40s
Allows Msg Delete: true
Allows Purge: true
Allows Rollups: false
Maximum Messages: unlimited
Maximum Bytes: unlimited
Maximum Age: 40.00s
Maximum Message Size: unlimited
Maximum Consumers: unlimited
Cluster Information:
Name: test-east
Leader: nats-0
State:
Messages: 0
Bytes: 0 B
FirstSeq: 632
LastSeq: 631 @ 2022-07-26T23:58:13 UTC
Active Consumers: 10
字符串
下面是消费者的配置:
Configuration:
Durable Name: e5
Delivery Subject: _INBOX.SZqS5641roDOlg7tlbea4w
Filter Subject: signals.east.test.>
Deliver Policy: All
Deliver Queue Group: e5
Ack Policy: Explicit
Ack Wait: 30s
Replay Policy: Instant
Max Ack Pending: 1
Flow Control: false
Cluster Information:
Name: test-east
Leader: nats-0
State:
Last Delivered Message: Consumer sequence: 13 Stream sequence: 591 Last delivery: 33m17s ago
Acknowledgment floor: Consumer sequence: 13 Stream sequence: 591 Last Ack: 33m17s ago
Outstanding Acks: 0 out of maximum 1
Redelivered Messages: 0
Unprocessed Messages: 0
Active Interest: No interest
型
我试了这个代码:
sub, err := js.PullSubscribe(subj, consName)
if err != nil {
fmt.Println(err)
return
}
if err != nil {
log.Fatalf("Error setting pending limits on the subscriber: %v", err)
}
ctx := context.TODO()
for {
select {
case <-ctx.Done():
return
default:
}
msgs, err := sub.Fetch(1, nats.Context(ctx))
for _, msg := range msgs {
msg.AckSync()
if err != nil {
fmt.Println(err)
log.Fatal(err)
}
fmt.Printf(fmt.Sprintf("pull-sub Msg:%s- %s\n", msg.Header.Get(nats.MsgIdHdr), string(msg.Data)))
}
}
型
但是每个示例都得到相同的消息两次,不管是什么。我也尝试了这个代码:
func jetsubscribeConsumer(js nats.JetStreamContext, subj, queue string) (err error) {
ctx := context.TODO()
handler := func(m *nats.Msg) {
m.AckSync()
mdata, _ := m.Metadata()
fmt.Println(mdata.Stream, mdata.Consumer, mdata.Domain, mdata.NumDelivered, mdata.NumPending, mdata.Sequence.Consumer)
fmt.Println(queue, string(m.Data))
}
_, err = js.QueueSubscribe(subj, queue, handler, nats.MaxAckPending(1), nats.ManualAck())
if err != nil {
fmt.Println(err)
return
}
<-ctx.Done()
return nil
}
型
下面是一个应用程序a的结果
pull-sub Msg:0- xyz 0- 16:57:05
pull-sub Msg:0- xyz 0- 16:57:05
pull-sub Msg:2- xyz 2- 16:57:05
pull-sub Msg:2- xyz 2- 16:57:05
pull-sub Msg:4- xyz 4- 16:57:05
pull-sub Msg:4- xyz 4- 16:57:05
pull-sub Msg:6- xyz 6- 16:57:05
pull-sub Msg:6- xyz 6- 16:57:05
pull-sub Msg:8- xyz 8- 16:57:05
pull-sub Msg:8- xyz 8- 16:57:05
型
下面是应用程序a的示例2的结果
pull-sub Msg:1- xyz 1- 16:58:13
pull-sub Msg:1- xyz 1- 16:58:13
pull-sub Msg:3- xyz 3- 16:58:13
pull-sub Msg:3- xyz 3- 16:58:13
pull-sub Msg:5- xyz 5- 16:58:13
pull-sub Msg:5- xyz 5- 16:58:13
pull-sub Msg:7- xyz 7- 16:58:13
pull-sub Msg:7- xyz 7- 16:58:13
pull-sub Msg:9- xyz 9- 16:58:13
pull-sub Msg:9- xyz 9- 16:58:13
型
这就是我如何出版
func pushMsg(js nats.JetStreamContext, topic string) {
for i := 0; i < 10; i++ {
x := nats.NewMsg(topic)
x.Data = []byte(
fmt.Sprintf("xyz %v- %s", i, time.Now().Format("15:04:05")),
)
ack, err := js.PublishMsg(x)
if err != nil {
fmt.Println(err)
return
}
fmt.Printf("%#v\n", ack)
fmt.Println(i, " ", string(x.Data), x.Header.Get(nats.MsgIdHdr))
}
}
型
1条答案
按热度按时间ecfdbz9o1#
配置的消费者是一个基于推送的队列消费者,而基于拉取的消费者是一个单独的消费者,所以你有两个消费者。