如何在Golang中使用jscontext.newConsumer()创建NATS推送消费者

46qrfjad  于 9个月前  发布在  Go
关注(0)|答案(2)|浏览(90)

在创建js context AddConsumer(topic.ID(),&nats.ConsumerConfig{})创建pull consumer时,有没有办法使用addconsumer创建push consumer

eufgjt7s

eufgjt7s1#

这部分的文档非常糟糕,错误描述性不强。我花了几个小时才弄清楚。下面是我发现的...
要使用AddConsumer创建推送消费者,必须提供DeliverySubject参数,它可以是任何参数,为了简单起见,它可以与消费者名称相同。这部分的文档非常糟糕,没有解释如何使用它。
下面是一个完整的工作示例,它创建一个消费者并从该消费者订阅特定的主题。

nc, err := nats.Connect("nats://localhost:4222")
if err != nil {
    log.Fatalf("could not connect to nats: %s", err)
}

js, err := nc.JetStream()
if err != nil {
    log.Fatal("Could not get jetstream:", err)
}

streamName := "TEST_STREAM"
_, err = js.AddStream(&nats.StreamConfig{
    Name:      streamName,
    Subjects:  []string{"test.>"},
    Retention: nats.InterestPolicy,
    Replicas:  1,
})

if err != nil {
    fmt.Printf("error creating stream %q: %s\n", streamName, err)
    return
}

groupName := "group-1"
consumerName := fmt.Sprintf("%s-consumer", groupName)
_, err = js.AddConsumer(
    streamName,
    &nats.ConsumerConfig{
        Name:           consumerName,
        Durable:        consumerName,
        DeliverSubject: consumerName,
        DeliverGroup:   groupName,
    },
)

if err != nil {
    fmt.Printf("error creating consumer %q: %s\n", consumerName, err)
    return
}

subject := "test.resource.created"
_, err = js.QueueSubscribe(
    subject,
    groupName,
    func(msg *nats.Msg) { 
        fmt.Printf("Jetstream - received %q: %q\n", msg.Subject, msg.Data) 
    },
    nats.Bind(streamName, consumerName),
)

if err != nil {
    fmt.Printf("error subscribing to %q: %s\n", subject, err)
}

字符串

交付组是为了让多个应用示例可以订阅同一个消费者,并且事件只会随机交付给其中一个,也就是说,如果您想横向伸缩应用。

如果你想限制消费者接收的事件。那么你应该向消费者配置提供SubjectFilter。每个消费者将持久化他们的过滤器匹配的所有事件,或者如果没有提供过滤器,则持久化流中的所有事件。
创建消费者后,您可以使用任何JetStream订阅方法(js.SubscribeSyncjs.QueueSubscribeSyncjs.Subscribejs.QueueSubscribe,...)订阅它,但您必须提供nats选项将订阅绑定到消费者,这是使用nats.Bind(streamName, consumerName)完成的
订阅时,可以将主题名称保留为空字符串,以订阅来自消费者的所有事件。

chhqkbe1

chhqkbe12#

创造消费者。

js.AddConsumer("ORDERS", &nats.ConsumerConfig{
    Durable: "MONITOR",
})

字符串
AddConsumer将消费者添加到流中。

AddConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*ConsumerInfo, error)

相关问题