Kafka多个消费者在一个组(相同的groupId)中具有许多反序列化器,无法选择正确的侦听器

qfe3c7zg  于 2023-06-21  发布在  Apache
关注(0)|答案(1)|浏览(137)

我在同一主题中为不同的事件类型设置了多个消费者,每个事件都有反序列化器组件。当收到消息时,反序列化程序不是正确的反序列化程序,并且不使用事件。在几次重试之后,它会获取正确的侦听器,但在这种情况下,我们会错过很多消息。但是当听众在不同的组中时,一切都按预期进行。
Kafka如何选择哪个反序列化器来选择,然后消费消息。
配置:

kafka:
    "FirstEventType":
      "[bootstrap.servers]": ${KAFKA_CONSUMER_BROKER_LIST:localhost:9092}
      "[group.id]": GroupOne
      "[client.id]": firstEventType
      "[auto.offset.reset]": earliest
      "[key.deserializer]": org.apache.kafka.common.serialization.StringDeserializer
      "[value.deserializer]":org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
      "[spring.deserializer.value.delegate.class]": com.my.pack.svc.kafka.deserializers.FirstEventTypeDeserializer
      "[messageType]": FirstEventType
      topic: topicOne
    "SecondEventType":
      "[bootstrap.servers]": ${KAFKA_CONSUMER_BROKER_LIST:localhost:9092}
      "[group.id]": GroupOne
      "[client.id]": secondEventType
      "[auto.offset.reset]": earliest
      "[key.deserializer]": org.apache.kafka.common.serialization.StringDeserializer
      "[value.deserializer]": org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
      "[spring.deserializer.value.delegate.class]": com.ft.subs.b2cfulfilsvc.kafka.deserializers.SecondEventTypeDeserializer
      "[messageType]": SecondEventType
      topic: topicOne

Kafka听众:

@KafkaListener(
      topics = "topicOne",
      containerFactory = "FirstEventTypeFactory",
      groupId = "GroupOne)
  public void onFirstEventType(
      FirstEventType eventType) {
   ....
  }
@KafkaListener(
      topics = "topicOne",
      containerFactory = "SecondEventTypeFactory",
      groupId = "GroupOne)
  public void onSecondEventType(
      SecondEventType eventType) {
   ....
  }
@Component
public class FirstEventTypeDeserializer
    extends AbstractMessageDeserializer<FirstEventType> {}
@Component
public class SecondEventTypeDeserializer
    extends AbstractMessageDeserializer<SecondEventType> {}
import org.apache.kafka.common.serialization.Deserializer;
...

public abstract class AbstractMessageDeserializer<T> implements Deserializer<T> {}
hgncfbus

hgncfbus1#

你的处决结果是正确的。同一个组中的多个消费者只会导致其中一个消费者从该主题中的一个分区消费。
您可能需要查看@KafkaHandler:https://docs.spring.io/spring-kafka/reference/html/#class-level-kafkalistener。

相关问题