Spring Boot NATS JetStream的侦听器

cigdeys3  于 2022-11-23  发布在  Spring
关注(0)|答案(2)|浏览(181)

有人能帮助如何配置NATS急流订阅在Spring Boot 异步示例:寻找一个等价的注解,如@kafkalistener表示Nats jetstream
我可以使用端点提取消息,但是当尝试使用pushSubscription提取消息时,未调用dispatcherhandler。需要知道如何使侦听器处于活动状态,并在消息发布到主题后立即使用消息。
任何关于这方面的见解/例子都将是有帮助的,提前感谢。

rkue9o1l

rkue9o1l1#

我不知道你的JetStream保留策略是什么,也不知道你想要订阅的方式。但是我有WorkQueuepolicy推送订阅的示例代码,希望这能对你有所帮助。

public static void subscribe(String streamName, String subjectKey,
                             String queueName, IMessageHandler iMessageHandler) throws IOException,
        InterruptedException, JetStreamApiException {
    long s = System.currentTimeMillis();
    Connection nc = Nats.connect(options);
    long e = System.currentTimeMillis();
    logger.info("Nats Connect in " + (e - s) + " ms");
    JetStream js = nc.jetStream();
    Dispatcher disp = nc.createDispatcher();
    MessageHandler handler = (msg) -> {
        try {
            iMessageHandler.onMessageReceived(msg);
        } catch (Exception exc) {
            msg.nak();
        }
    };
    ConsumerConfiguration cc = ConsumerConfiguration.builder()
            .durable(queueName)
            .deliverGroup(queueName)
            .maxDeliver(3)
            .ackWait(Duration.ofMinutes(2))
            .build();
    PushSubscribeOptions so = PushSubscribeOptions.builder()
            .stream(streamName)
            .configuration(cc)
            .build();
    js.subscribe(subjectKey, disp, handler, false, so);
    System.out.println("NatsUtil: " + durableName + "subscribe");
}

IMessageHandler是我的自定义接口,用于处理nats.io收到的消息。

9rbhqvlz

9rbhqvlz2#

首先,配置NATS连接。在这里你将指定所有的连接细节,如服务器地址、身份验证选项、连接级回调等。

Connection natsConnection = Nats.connect(
            new Options.Builder()
                    .server("nats://localhost:4222")
                    .connectionListener((connection, eventType) -> {})
                    .errorListener(new ErrorListener(){})
                    .build());

然后构造JetStream示例

JetStream jetStream = natsConnection.jetStream();

现在您可以订阅主题。请注意,JetStream使用者可以是持久的或短暂的,可以根据推送或拉取逻辑工作。请参考NATS文档(https://docs.nats.io/nats-concepts/jetstream/consumers),为您的特定用例做出适当的选择。以下示例构造了一个持久的推送使用者:

//Subscribe to a subject.
    String subject = "my-subject";

    //queues are analogous to Kafka consumer groups, i.e. consumers belonging
    //to the same queue (or, better to say, reading the same queue) will get
    //only one instance of each message from the corresponding subject
    //and only one of those consumers will be chosen to process the message
    String queueName = "my-queue";

    //Choosing delivery policy is analogous to setting the current offset
    //in a partition for a consumer or consumer group in Kafka.
    DeliverPolicy deliverPolicy = DeliverPolicy.New;
    PushSubscribeOptions subscribeOptions = ConsumerConfiguration.builder()
            .durable(queueName)
            .deliverGroup(queueName)
            .deliverPolicy(deliverPolicy)
            .buildPushSubscribeOptions();
    Subscription subscription = jetStream.subscribe(
            subject,
            queueName,
            natsConnection.createDispatcher(),
            natsMessage -> {
                //This callback will be called for incoming messages
                //asynchronously. Every subscription configured this
                //way will be backed by its own thread, that will be
                //used to call this callback.
            },
            true,  //true if you want received messages to be acknowledged
                   //automatically, otherwise you will have to call
                   //natsMessage.ack() manually in the above callback function
            subscribeOptions);

至于声明式API(即某种形式的@NatsListener注解,类似于Apache Kafka项目的Spring中的@KafkaListener),Spring中没有现成可用的注解。如果您觉得绝对需要它,您可以自己编写一个注解,前提是您熟悉SpringBeanPostProcessor-s或其他可以帮助实现此目的的扩展机制。或者,您可以参考第三方库,看起来很多人(包括我自己)在从Kafka转向NATS的时候感到有点不舒服,所以他们尝试着把Kafka世界的惯常做法带过来。一些例子可以在github上找到:

可能还有其他人。

相关问题