apache-kafka Kafka的消费者并没有消费

x8goxv8g  于 2022-11-01  发布在  Apache
关注(0)|答案(1)|浏览(192)

我正在尝试创建一个应用程序,它将使用一个Kafka主题,有10个分区。我没有定义消费者的分区数为application.properties,因为应用程序将部署在OpenShift中,我希望OpenShift进行负载平衡。问题是,当我启动应用程序时,它没有使用任何东西。代码非常简单,我真的不知道哪里出了问题:
这里是消费者,日志什么也没有显示:

@Incoming("plac-fate")
    @Transactional
    public void consume(NFeDistSVBAPayload payload) throws PlacException {
        logger.info("Consumindo payload: " + payload);
        service.criaNFeDistSVBAEntity(payload);
        logger.info("Payload com nrProtocolo '" + payload.getNrProtocolo() + "' consumido com sucesso.");
    }

我也有作为反序列化器它的日志没有显示任何东西:

@Override
    public NFeDistSVBAPayload deserialize(String topic, byte[] data) {
        logger.info("Deserializando mensagem do topico: " + topic);
        var strPayload = new String(data, StandardCharsets.UTF_8);
        var module = new JaxbAnnotationModule();
        var mapper = JsonMapper.builder()
        .enable(JsonReadFeature.ALLOW_BACKSLASH_ESCAPING_ANY_CHARACTER)
        .build();
        mapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
        mapper.registerModule(module);
        try {
            var payload =  mapper.readValue(strPayload, NFeDistSVBAPayload.class);
            return payload;
        } catch (JsonProcessingException e) {
            logger.error(e);
            return null;
        }
    }

这里是àpplication.properties

quarkus.kafka.devservices.enabled=false
kafka.bootstrap.servers=${PLAC_KAFKA_URL}
mp.messaging.incoming.plac-fate.connector=smallrye-kafka
mp.messaging.incoming.plac-fate.value.deserializer=br.gov.pr.fazenda.plac.dominio.utils.DistNFePayloadSVBADeserializer
mp.messaging.incoming.plac-fate.group.id=plac-fate-consumer

下面是应用程序日志:

__  ____  __  _____   ___  __ ____  ______ 
 --/ __ \/ / / / _ | / _ \/ //_/ / / / __/
 -/ /_/ / /_/ / __ |/ , _/ ,< / /_/ /\ \
--\___\_\____/_/ |_/_/|_/_/|_|\____/___/
2022-10-11 18:53:42,919 INFO  [io.sma.rea.mes.kafka] (Quarkus Main Thread) SRMSG18229: Configured topics for channel 'plac-fate': [plac-fate]
2022-10-11 18:53:42,949 INFO  [io.sma.rea.mes.kafka] (Quarkus Main Thread) SRMSG18214: Key deserializer omitted, using String as default
2022-10-11 18:53:43,764 INFO  [io.sma.rea.mes.kafka] (smallrye-kafka-consumer-thread-0) SRMSG18257: Kafka consumer kafka-consumer-plac-fate, connected to Kafka brokers 'kafka:9092', belongs to the 'plac-fate-consumer' consumer group and is configured to poll records from [plac-fate]
2022-10-11 18:53:43,873 INFO  [io.quarkus] (Quarkus Main Thread) consumer 1.0-SNAPSHOT on JVM (powered by Quarkus 2.12.3.Final) started in 12.193s. Listening on: http://localhost:8080
2022-10-11 18:53:43,873 INFO  [io.quarkus] (Quarkus Main Thread) Profile dev activated. Live Coding activated.
2022-10-11 18:53:43,874 INFO  [io.quarkus] (Quarkus Main Thread) Installed features: [agroal, cdi, hibernate-orm, hibernate-orm-panache, jdbc-oracle, kafka-client, micrometer, narayana-jta, qute, smallrye-context-propagation, smallrye-reactive-messaging, smallrye-reactive-messaging-kafka, vertx]

正如您所看到的,日志显示已创建并连接了使用者,但它什么也没做。

rekjcdws

rekjcdws1#

使用者默认从主题的最后开始(此处没有要使用的内容)
如果要从主题开头开始阅读现有事件,则需要添加

mp.messaging.incoming.{channel-name}.auto.offset.reset=earliest

相关问题