我正在尝试创建一个应用程序,它将使用一个Kafka主题,有10个分区。我没有定义消费者的分区数为application.properties
,因为应用程序将部署在OpenShift中,我希望OpenShift进行负载平衡。问题是,当我启动应用程序时,它没有使用任何东西。代码非常简单,我真的不知道哪里出了问题:
这里是消费者,日志什么也没有显示:
@Incoming("plac-fate")
@Transactional
public void consume(NFeDistSVBAPayload payload) throws PlacException {
logger.info("Consumindo payload: " + payload);
service.criaNFeDistSVBAEntity(payload);
logger.info("Payload com nrProtocolo '" + payload.getNrProtocolo() + "' consumido com sucesso.");
}
我也有作为反序列化器它的日志没有显示任何东西:
@Override
public NFeDistSVBAPayload deserialize(String topic, byte[] data) {
logger.info("Deserializando mensagem do topico: " + topic);
var strPayload = new String(data, StandardCharsets.UTF_8);
var module = new JaxbAnnotationModule();
var mapper = JsonMapper.builder()
.enable(JsonReadFeature.ALLOW_BACKSLASH_ESCAPING_ANY_CHARACTER)
.build();
mapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
mapper.registerModule(module);
try {
var payload = mapper.readValue(strPayload, NFeDistSVBAPayload.class);
return payload;
} catch (JsonProcessingException e) {
logger.error(e);
return null;
}
}
这里是àpplication.properties
:
quarkus.kafka.devservices.enabled=false
kafka.bootstrap.servers=${PLAC_KAFKA_URL}
mp.messaging.incoming.plac-fate.connector=smallrye-kafka
mp.messaging.incoming.plac-fate.value.deserializer=br.gov.pr.fazenda.plac.dominio.utils.DistNFePayloadSVBADeserializer
mp.messaging.incoming.plac-fate.group.id=plac-fate-consumer
下面是应用程序日志:
__ ____ __ _____ ___ __ ____ ______
--/ __ \/ / / / _ | / _ \/ //_/ / / / __/
-/ /_/ / /_/ / __ |/ , _/ ,< / /_/ /\ \
--\___\_\____/_/ |_/_/|_/_/|_|\____/___/
2022-10-11 18:53:42,919 INFO [io.sma.rea.mes.kafka] (Quarkus Main Thread) SRMSG18229: Configured topics for channel 'plac-fate': [plac-fate]
2022-10-11 18:53:42,949 INFO [io.sma.rea.mes.kafka] (Quarkus Main Thread) SRMSG18214: Key deserializer omitted, using String as default
2022-10-11 18:53:43,764 INFO [io.sma.rea.mes.kafka] (smallrye-kafka-consumer-thread-0) SRMSG18257: Kafka consumer kafka-consumer-plac-fate, connected to Kafka brokers 'kafka:9092', belongs to the 'plac-fate-consumer' consumer group and is configured to poll records from [plac-fate]
2022-10-11 18:53:43,873 INFO [io.quarkus] (Quarkus Main Thread) consumer 1.0-SNAPSHOT on JVM (powered by Quarkus 2.12.3.Final) started in 12.193s. Listening on: http://localhost:8080
2022-10-11 18:53:43,873 INFO [io.quarkus] (Quarkus Main Thread) Profile dev activated. Live Coding activated.
2022-10-11 18:53:43,874 INFO [io.quarkus] (Quarkus Main Thread) Installed features: [agroal, cdi, hibernate-orm, hibernate-orm-panache, jdbc-oracle, kafka-client, micrometer, narayana-jta, qute, smallrye-context-propagation, smallrye-reactive-messaging, smallrye-reactive-messaging-kafka, vertx]
正如您所看到的,日志显示已创建并连接了使用者,但它什么也没做。
1条答案
按热度按时间rekjcdws1#
使用者默认从主题的最后开始(此处没有要使用的内容)
如果要从主题开头开始阅读现有事件,则需要添加