我正在一个项目中工作,从我们现有的ElasticSearch示例读取并在Pulsar中生成消息。如果我以高度多线程的方式执行此操作,而没有任何显式同步,我会多次出现以下日志行:Message with sequence id X might be a duplicate but cannot be determined at this time.
它由Pulsar Java客户端中的以下代码行生成:https://github.com/apache/pulsar/blob/a4c3034f52f857ae0f4daf5d366ea9e578133bc2/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L653
当我在方法中添加一个synchronized块,在脉冲星模板上同步时,错误消失了,但是我的发布率大大下降了。
下面是我的方法的当前工作实现,该方法将Protobuf消息发送到Pulsar:
public <T extends GeneratedMessageV3> CompletableFuture<MessageId> persist(T o) {
var descriptor = o.getDescriptorForType();
PulsarPersistTopicSettings settings = pulsarPersistConfig.getSettings(descriptor);
MessageBuilder<T> messageBuilder = Optional.ofNullable(pulsarPersistConfig.getMessageBuilder(descriptor))
.orElse(DefaultMessageBuilder.DEFAULT_MESSAGE_BUILDER);
Optional<ProducerBuilderCustomizer<T>> producerBuilderCustomizerOpt =
Optional.ofNullable(pulsarPersistConfig.getProducerBuilder(descriptor));
PulsarOperations.SendMessageBuilder<T> sendMessageBuilder;
sendMessageBuilder = pulsarTemplate.newMessage(o)
.withSchema(Schema.PROTOBUF_NATIVE(o.getClass()))
.withTopic(settings.getTopic());
producerBuilderCustomizerOpt.ifPresent(sendMessageBuilder::withProducerCustomizer);
sendMessageBuilder.withMessageCustomizer(mb -> messageBuilder.applyMessageBuilderKeys(o, mb));
synchronized (pulsarTemplate) {
try {
return sendMessageBuilder.sendAsync();
} catch (PulsarClientException re) {
throw new PulsarPersistException(re);
}
}
}
上述方法的原始版本没有synchronized(pulsarTemplate) { ... }
块。它执行得更快,但生成了大量关于重复消息的日志,我知道这是不正确的。添加synchronized块摆脱了日志消息,但减慢了发布速度。
多线程访问PulsarTemplate的最佳实践是什么?有没有更好的方法来实现非常高吞吐量的消息发布?
我是否应该考虑使用React式客户机来代替?
编辑:我已经更新了代码块,以显示避免日志行所需的最小同步,它只是在.sendAsync(...)
调用期间进行同步。
1条答案
按热度按时间lf5gs5x21#
您的使用w/o的
synchronized
应该工作。我会调查一下,虽然看看我看到什么其他的事情。在此期间,这将是伟大的给予React客户端的尝试。