我试图用jms连接到Kafka。我按照这个指南使用帕亚拉Kafka连接器。这对wildfly有效。但我不能让它在openliberty上工作。
这个 server.xml
:
<resourceAdapter id="kafkajmsra" location="${shared.resource.dir}kafka-rar-0.5.0.rar"/>
<jmsTopicConnectionFactory jndiName="JMSTopicFactory">
<properties.kafkajmsra
bootstrapServerConfig="kafka:9092"/>
</jmsTopicConnectionFactory>
<jmsTopic id="kafkaTopic" jndiName="JmsTopic">
<properties.kafkajmsra topicName="demoTopic" />
</jmsTopic>
使用这些配置,如果我尝试注入这些组件,就会得到nullpointerexception。可以找到jndi名称,但不能使用这些参数。
@Resource(lookup = "JMSTopicFactory")
private TopicConnectionFactory jmsTopicFactory;
@Resource(lookup = "JMSTopic")
private Topic jmsTopic;
我是不是错过了什么 server.xml
?
我尝试使用默认的jms连接器。它确实连接到Kafka,但是连接被拒绝了,在Kafka那边它告诉我:
[2020-05-31 20:05:27,134] WARN [SocketServer brokerId=1] Unexpected error from /172.20.0.4; closing connection (org.apache.kafka.common.network.Selector)
org.apache.kafka.common.network.InvalidReceiveException: Invalid receive (size = -1091633152)
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:103)
at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:448)
at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:398)
at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:678)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:580)
at org.apache.kafka.common.network.Selector.poll(Selector.java:485)
at kafka.network.Processor.poll(SocketServer.scala:893)
at kafka.network.Processor.run(SocketServer.scala:792)
at java.lang.Thread.run(Thread.java:748)
编辑:
我换了衣服 server.xml
现在看起来像这样:
<resourceAdapter id="kafkajmsra" location="${shared.resource.dir}/kafka-rar-0.4.0.rar"/>
<connectionFactory jndi="java:app/KafkaConnectionFactory"
interfaceName="fish.payara.cloud.connectors.kafka.api.KafkaConnectionFactory"
resourceAdapter="liberty/wlp/usr/shared/resources/kafka-rar-0.4.0.rar">
</connectionFactory>
java代码如下所示:
@ApplicationScoped
public class TopicProducer {
private static final Logger LOG = LoggerFactory.getLogger(TopicProducer.class);
public TopicProducer() throws Exception {
LOG.info("Starting TopicProducer");
}
@Resource(lookup = "java:app/KafkaConnectionFactory")
KafkaConnectionFactory kafkaConnectionFactory;
public void send(final String msg) {
try (KafkaConnection connection = kafkaConnectionFactory.createConnection()) {
LOG.info("Send message: {}", msg);
connection.send(new ProducerRecord("demoTopic", msg));
} catch (Exception e) {
LOG.error(e.getMessage(), e);
}
}
}
但是现在我在@resource上得到了一个nullpointerexception。我的猜测是找不到资源适配器。
暂无答案!
目前还没有任何答案,快来回答吧!