在创建js context AddConsumer(topic.ID(),&nats.ConsumerConfig{})创建pull consumer时,有没有办法使用addconsumer创建push consumer
eufgjt7s1#
这部分的文档非常糟糕,错误描述性不强。我花了几个小时才弄清楚。下面是我发现的...要使用AddConsumer创建推送消费者,必须提供DeliverySubject参数,它可以是任何参数,为了简单起见,它可以与消费者名称相同。这部分的文档非常糟糕,没有解释如何使用它。下面是一个完整的工作示例,它创建一个消费者并从该消费者订阅特定的主题。
AddConsumer
DeliverySubject
nc, err := nats.Connect("nats://localhost:4222") if err != nil { log.Fatalf("could not connect to nats: %s", err) } js, err := nc.JetStream() if err != nil { log.Fatal("Could not get jetstream:", err) } streamName := "TEST_STREAM" _, err = js.AddStream(&nats.StreamConfig{ Name: streamName, Subjects: []string{"test.>"}, Retention: nats.InterestPolicy, Replicas: 1, }) if err != nil { fmt.Printf("error creating stream %q: %s\n", streamName, err) return } groupName := "group-1" consumerName := fmt.Sprintf("%s-consumer", groupName) _, err = js.AddConsumer( streamName, &nats.ConsumerConfig{ Name: consumerName, Durable: consumerName, DeliverSubject: consumerName, DeliverGroup: groupName, }, ) if err != nil { fmt.Printf("error creating consumer %q: %s\n", consumerName, err) return } subject := "test.resource.created" _, err = js.QueueSubscribe( subject, groupName, func(msg *nats.Msg) { fmt.Printf("Jetstream - received %q: %q\n", msg.Subject, msg.Data) }, nats.Bind(streamName, consumerName), ) if err != nil { fmt.Printf("error subscribing to %q: %s\n", subject, err) }
字符串
交付组是为了让多个应用示例可以订阅同一个消费者,并且事件只会随机交付给其中一个,也就是说,如果您想横向伸缩应用。
如果你想限制消费者接收的事件。那么你应该向消费者配置提供SubjectFilter。每个消费者将持久化他们的过滤器匹配的所有事件,或者如果没有提供过滤器,则持久化流中的所有事件。创建消费者后,您可以使用任何JetStream订阅方法(js.SubscribeSync,js.QueueSubscribeSync,js.Subscribe,js.QueueSubscribe,...)订阅它,但您必须提供nats选项将订阅绑定到消费者,这是使用nats.Bind(streamName, consumerName)完成的订阅时,可以将主题名称保留为空字符串,以订阅来自消费者的所有事件。
SubjectFilter
js.SubscribeSync
js.QueueSubscribeSync
js.Subscribe
js.QueueSubscribe
nats.Bind(streamName, consumerName)
chhqkbe12#
创造消费者。
js.AddConsumer("ORDERS", &nats.ConsumerConfig{ Durable: "MONITOR", })
字符串AddConsumer将消费者添加到流中。
AddConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*ConsumerInfo, error)
型
2条答案
按热度按时间eufgjt7s1#
这部分的文档非常糟糕,错误描述性不强。我花了几个小时才弄清楚。下面是我发现的...
要使用
AddConsumer
创建推送消费者,必须提供DeliverySubject
参数,它可以是任何参数,为了简单起见,它可以与消费者名称相同。这部分的文档非常糟糕,没有解释如何使用它。下面是一个完整的工作示例,它创建一个消费者并从该消费者订阅特定的主题。
字符串
交付组是为了让多个应用示例可以订阅同一个消费者,并且事件只会随机交付给其中一个,也就是说,如果您想横向伸缩应用。
如果你想限制消费者接收的事件。那么你应该向消费者配置提供
SubjectFilter
。每个消费者将持久化他们的过滤器匹配的所有事件,或者如果没有提供过滤器,则持久化流中的所有事件。创建消费者后,您可以使用任何JetStream订阅方法(
js.SubscribeSync
,js.QueueSubscribeSync
,js.Subscribe
,js.QueueSubscribe
,...)订阅它,但您必须提供nats选项将订阅绑定到消费者,这是使用nats.Bind(streamName, consumerName)
完成的订阅时,可以将主题名称保留为空字符串,以订阅来自消费者的所有事件。
chhqkbe12#
创造消费者。
字符串
AddConsumer将消费者添加到流中。
型