如何通过openliberty上的jms连接到kafka?

jhiyze9q  于 2021-06-05  发布在  Kafka
关注(0)|答案(0)|浏览(282)

我试图用jms连接到Kafka。我按照这个指南使用帕亚拉Kafka连接器。这对wildfly有效。但我不能让它在openliberty上工作。
这个 server.xml :

<resourceAdapter id="kafkajmsra" location="${shared.resource.dir}kafka-rar-0.5.0.rar"/>

<jmsTopicConnectionFactory jndiName="JMSTopicFactory">
     <properties.kafkajmsra
                bootstrapServerConfig="kafka:9092"/>
</jmsTopicConnectionFactory>

<jmsTopic id="kafkaTopic" jndiName="JmsTopic">
     <properties.kafkajmsra topicName="demoTopic" />
</jmsTopic>

使用这些配置,如果我尝试注入这些组件,就会得到nullpointerexception。可以找到jndi名称,但不能使用这些参数。

@Resource(lookup = "JMSTopicFactory")
private TopicConnectionFactory jmsTopicFactory;

@Resource(lookup = "JMSTopic")
private Topic jmsTopic;

我是不是错过了什么 server.xml ?
我尝试使用默认的jms连接器。它确实连接到Kafka,但是连接被拒绝了,在Kafka那边它告诉我:

[2020-05-31 20:05:27,134] WARN [SocketServer brokerId=1] Unexpected error from /172.20.0.4; closing connection (org.apache.kafka.common.network.Selector)
org.apache.kafka.common.network.InvalidReceiveException: Invalid receive (size = -1091633152)
 at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:103)
 at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:448)
 at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:398)
 at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:678)
 at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:580)
 at org.apache.kafka.common.network.Selector.poll(Selector.java:485)
 at kafka.network.Processor.poll(SocketServer.scala:893)
 at kafka.network.Processor.run(SocketServer.scala:792)
 at java.lang.Thread.run(Thread.java:748)

编辑:
我换了衣服 server.xml 现在看起来像这样:

<resourceAdapter id="kafkajmsra" location="${shared.resource.dir}/kafka-rar-0.4.0.rar"/>

    <connectionFactory jndi="java:app/KafkaConnectionFactory"
                       interfaceName="fish.payara.cloud.connectors.kafka.api.KafkaConnectionFactory"
                       resourceAdapter="liberty/wlp/usr/shared/resources/kafka-rar-0.4.0.rar">
    </connectionFactory>

java代码如下所示:

@ApplicationScoped
public class TopicProducer {

  private static final Logger LOG = LoggerFactory.getLogger(TopicProducer.class);

  public TopicProducer() throws Exception {
    LOG.info("Starting TopicProducer");
  }

  @Resource(lookup = "java:app/KafkaConnectionFactory")
  KafkaConnectionFactory kafkaConnectionFactory;

  public void send(final String msg) {
    try (KafkaConnection connection = kafkaConnectionFactory.createConnection()) {

      LOG.info("Send message: {}", msg);

      connection.send(new ProducerRecord("demoTopic", msg));
    } catch (Exception e) {
      LOG.error(e.getMessage(), e);
    }
  }
}

但是现在我在@resource上得到了一个nullpointerexception。我的猜测是找不到资源适配器。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题