Go语言 Nats Jetstream消息发送两次

xvw2m8pv  于 11个月前  发布在  Go
关注(0)|答案(1)|浏览(127)

我想要一个流(主总线),它获取所有信号(signals.>),一个消费者,它从东边获取所有信号(signals.east.>)。几个应用程序从那个消费者那里消费。每个应用程序可能有1-5个示例,但我只想要其中一个示例接收那个信号(没有重复和顺序问题)。
我有一个流,配置如下:

Configuration:

             Subjects: signals.>
     Acknowledgements: true
            Retention: File - Interest
             Replicas: 1
       Discard Policy: Old
     Duplicate Window: 40s
    Allows Msg Delete: true
         Allows Purge: true
       Allows Rollups: false
     Maximum Messages: unlimited
        Maximum Bytes: unlimited
          Maximum Age: 40.00s
 Maximum Message Size: unlimited
    Maximum Consumers: unlimited

Cluster Information:
                 Name: test-east
               Leader: nats-0

State:
             Messages: 0
                Bytes: 0 B
             FirstSeq: 632
              LastSeq: 631 @ 2022-07-26T23:58:13 UTC
     Active Consumers: 10

字符串
下面是消费者的配置:

Configuration:

        Durable Name: e5
    Delivery Subject: _INBOX.SZqS5641roDOlg7tlbea4w
      Filter Subject: signals.east.test.>
      Deliver Policy: All
 Deliver Queue Group: e5
          Ack Policy: Explicit
            Ack Wait: 30s
       Replay Policy: Instant
     Max Ack Pending: 1
        Flow Control: false

Cluster Information:

                Name: test-east
              Leader: nats-0

State:

   Last Delivered Message: Consumer sequence: 13 Stream sequence: 591 Last delivery: 33m17s ago
     Acknowledgment floor: Consumer sequence: 13 Stream sequence: 591 Last Ack: 33m17s ago
         Outstanding Acks: 0 out of maximum 1
     Redelivered Messages: 0
     Unprocessed Messages: 0
          Active Interest: No interest


我试了这个代码:

sub, err := js.PullSubscribe(subj, consName)
    if err != nil {
        fmt.Println(err)
        return
    }

    if err != nil {
        log.Fatalf("Error setting pending limits on the subscriber: %v", err)
    }

    ctx := context.TODO()

    for {
        select {
        case <-ctx.Done():
            return
        default:
        }
        msgs, err := sub.Fetch(1, nats.Context(ctx))
        for _, msg := range msgs {
            msg.AckSync()
            if err != nil {
                fmt.Println(err)
                log.Fatal(err)
            }
            fmt.Printf(fmt.Sprintf("pull-sub Msg:%s- %s\n", msg.Header.Get(nats.MsgIdHdr), string(msg.Data)))
        }
    }


但是每个示例都得到相同的消息两次,不管是什么。我也尝试了这个代码:

func jetsubscribeConsumer(js nats.JetStreamContext, subj, queue string) (err error) {
    ctx := context.TODO()

    handler := func(m *nats.Msg) {
        m.AckSync()
        mdata, _ := m.Metadata()
        fmt.Println(mdata.Stream, mdata.Consumer, mdata.Domain, mdata.NumDelivered, mdata.NumPending, mdata.Sequence.Consumer)
        fmt.Println(queue, string(m.Data))
    }

    _, err = js.QueueSubscribe(subj, queue, handler, nats.MaxAckPending(1), nats.ManualAck())
    if err != nil {
        fmt.Println(err)
        return
    }

    <-ctx.Done()

    return nil
}


下面是一个应用程序a的结果

pull-sub Msg:0- xyz 0- 16:57:05
pull-sub Msg:0- xyz 0- 16:57:05
pull-sub Msg:2- xyz 2- 16:57:05
pull-sub Msg:2- xyz 2- 16:57:05
pull-sub Msg:4- xyz 4- 16:57:05
pull-sub Msg:4- xyz 4- 16:57:05
pull-sub Msg:6- xyz 6- 16:57:05
pull-sub Msg:6- xyz 6- 16:57:05
pull-sub Msg:8- xyz 8- 16:57:05
pull-sub Msg:8- xyz 8- 16:57:05


下面是应用程序a的示例2的结果

pull-sub Msg:1- xyz 1- 16:58:13
pull-sub Msg:1- xyz 1- 16:58:13
pull-sub Msg:3- xyz 3- 16:58:13
pull-sub Msg:3- xyz 3- 16:58:13
pull-sub Msg:5- xyz 5- 16:58:13
pull-sub Msg:5- xyz 5- 16:58:13
pull-sub Msg:7- xyz 7- 16:58:13
pull-sub Msg:7- xyz 7- 16:58:13
pull-sub Msg:9- xyz 9- 16:58:13
pull-sub Msg:9- xyz 9- 16:58:13


这就是我如何出版

func pushMsg(js nats.JetStreamContext, topic string) {
    for i := 0; i < 10; i++ {
        x := nats.NewMsg(topic)
        x.Data = []byte(
            fmt.Sprintf("xyz %v- %s", i, time.Now().Format("15:04:05")),
        )
        ack, err := js.PublishMsg(x)
        if err != nil {
            fmt.Println(err)
            return
        }
        fmt.Printf("%#v\n", ack)
        fmt.Println(i, " ", string(x.Data), x.Header.Get(nats.MsgIdHdr))
    }
}

ecfdbz9o

ecfdbz9o1#

配置的消费者是一个基于推送的队列消费者,而基于拉取的消费者是一个单独的消费者,所以你有两个消费者。

相关问题