Spring Pulsar的多线程使用

bybem2ql  于 2023-02-04  发布在  Spring
关注(0)|答案(1)|浏览(161)

我正在一个项目中工作,从我们现有的ElasticSearch示例读取并在Pulsar中生成消息。如果我以高度多线程的方式执行此操作,而没有任何显式同步,我会多次出现以下日志行:
Message with sequence id X might be a duplicate but cannot be determined at this time.
它由Pulsar Java客户端中的以下代码行生成:https://github.com/apache/pulsar/blob/a4c3034f52f857ae0f4daf5d366ea9e578133bc2/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L653
当我在方法中添加一个synchronized块,在脉冲星模板上同步时,错误消失了,但是我的发布率大大下降了。
下面是我的方法的当前工作实现,该方法将Protobuf消息发送到Pulsar:

public <T extends GeneratedMessageV3> CompletableFuture<MessageId> persist(T o) {
        var descriptor = o.getDescriptorForType();
        PulsarPersistTopicSettings settings = pulsarPersistConfig.getSettings(descriptor);
        MessageBuilder<T> messageBuilder = Optional.ofNullable(pulsarPersistConfig.getMessageBuilder(descriptor))
                .orElse(DefaultMessageBuilder.DEFAULT_MESSAGE_BUILDER);
        Optional<ProducerBuilderCustomizer<T>> producerBuilderCustomizerOpt =
                Optional.ofNullable(pulsarPersistConfig.getProducerBuilder(descriptor));
        PulsarOperations.SendMessageBuilder<T> sendMessageBuilder;

            sendMessageBuilder = pulsarTemplate.newMessage(o)
                    .withSchema(Schema.PROTOBUF_NATIVE(o.getClass()))
                    .withTopic(settings.getTopic());
            producerBuilderCustomizerOpt.ifPresent(sendMessageBuilder::withProducerCustomizer);
            sendMessageBuilder.withMessageCustomizer(mb -> messageBuilder.applyMessageBuilderKeys(o, mb));
        synchronized (pulsarTemplate) {
            try {
                return sendMessageBuilder.sendAsync();
            } catch (PulsarClientException re) {
                throw new PulsarPersistException(re);
            }
        }
    }

上述方法的原始版本没有synchronized(pulsarTemplate) { ... }块。它执行得更快,但生成了大量关于重复消息的日志,我知道这是不正确的。添加synchronized块摆脱了日志消息,但减慢了发布速度。
多线程访问PulsarTemplate的最佳实践是什么?有没有更好的方法来实现非常高吞吐量的消息发布?
我是否应该考虑使用React式客户机来代替?
编辑:我已经更新了代码块,以显示避免日志行所需的最小同步,它只是在.sendAsync(...)调用期间进行同步。

lf5gs5x2

lf5gs5x21#

您的使用w/o的synchronized应该工作。我会调查一下,虽然看看我看到什么其他的事情。在此期间,这将是伟大的给予React客户端的尝试。

相关问题