spring-kafka:applicationcontext中不同对象的多个侦听器

disho6za  于 2021-06-07  发布在  Kafka
关注(0)|答案(3)|浏览(400)

我能和社区核实一下听多个主题的最佳方式是什么,每个主题都包含不同类的信息吗?
在过去的几天里,我一直在和斯普林Kafka玩。到目前为止我的思考过程:
因为在初始化kafkalistenercontainerfactory时,需要将反序列化程序传递到DefaultKafkanConsumerFactory。这似乎表明,如果我需要多个容器,每个容器都反序列化不同类型的消息,我将无法使用@enablekafka和@kafkalistener注解。
这让我想到,唯一的方法就是示例化多个kafkamessagelistenercontainer。
鉴于kafkamessagelistenercontainers是单线程的,我需要同时听多个主题,我真的应该使用多个concurrentkafkamessagelistenercontainers。
我会在这里的正确轨道上吗?有没有更好的办法?
谢谢!

kqhtkvqz

kqhtkvqz1#

我想用下面的代码来应用你的观点

@Configuration
@EnableKafka
public class ConsumerConfig {

    @Value("${kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${kafka.group-id}")
    private String groupId;

    /**
     * Configuration of Consumer properties.
     * 
     * @return
     */
    //@Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();

        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);

        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);

        return props;
    }

    //@Bean
    public ConsumerFactory<String, ClassA> consumerFactory1() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(),
                new ClassA());
    }

    /**
     * Kafka Listener Container Factory.
     * @return
     */
    @Bean("kafkaListenerContainerFactory1")
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, ClassA>> kafkaListenerContainerFactory1() {
        ConcurrentKafkaListenerContainerFactory<String, ClassA> factory;
        factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory1());
        return factory;
    }

    //@Bean
    public ConsumerFactory<String, ClassB> consumerFactory2() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(),
                new ClassB());
    }

    /**
     * Kafka Listener Container Factory.
     * @return
     */
    @Bean("kafkaListenerContainerFactory2")
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, ClassB>> kafkaListenerContainerFactory2() {
        ConcurrentKafkaListenerContainerFactory<String, ClassB> factory;
        factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory2());
        return factory;
    }

    @Bean
    public ReceiverClass receiver() {
        return new ReceiverClass();
    }

    class ReceiverClass {
        @KafkaListener(topics = "topic1", group = "group-id-test",
                containerFactory = "kafkaListenerContainerFactory1")
        public void receiveTopic1(ClassA a) {
            System.out.println("ReceiverClass.receive() ClassA : " + a);
        }

        @KafkaListener(topics = "topic2", group = "group-id-test",
                containerFactory = "kafkaListenerContainerFactory2")
        public void receiveTopic2(ClassB b) {
            System.out.println("ReceiverClass.receive() Classb : " + b);
        }

    }

    class ClassB  implements Deserializer  {

        @Override
        public void configure(Map configs, boolean isKey) {
            // TODO Auto-generated method stub

        }

        @Override
        public Object deserialize(String topic, byte[] data) {
            // TODO Auto-generated method stub
            return null;
        }

        @Override
        public void close() {
            // TODO Auto-generated method stub

        }

    }

    class ClassA  implements Deserializer {

        @Override
        public void configure(Map configs, boolean isKey) {
            // TODO Auto-generated method stub

        }

        @Override
        public Object deserialize(String topic, byte[] data) {
            // TODO Auto-generated method stub
            return null;
        }

        @Override
        public void close() {
            // TODO Auto-generated method stub

        }

    }
}
bxjv4tth

bxjv4tth2#

这是一个非常简单的例子。

// -----------------------------------------------
// Sender
// -----------------------------------------------

@Configuration
public class SenderConfig {
    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        ......
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return props;
    }

    @Bean
    public ProducerFactory<String, Class1> producerFactory1() {
        return new DefaultKafkaProducerFactory<String, Class1>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, Class1> kafkaTemplate1() {
        return new KafkaTemplate<>(producerFactory1());
    }

    @Bean
    public Sender1 sender1() {
        return new Sender1();
    }

    //-------- send the second class --------

    @Bean
    public ProducerFactory<String, Class2> producerFactory2() {
        return new DefaultKafkaProducerFactory<String, Class2>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, Class2> kafkaTemplate2() {
        return new KafkaTemplate<>(producerFactory2());
    }

    @Bean
    public Sender2 sender2() {
        return new Sender2();
    }
}

public class Sender1 {
    @Autowired
    private KafkaTemplate<String, Class1> kafkaTemplate1;

    public void send(String topic, Class1 c1) {
        kafkaTemplate1.send(topic, c1);
   }
}

public class Sender2 {
    @Autowired
    private KafkaTemplate<String, Class2> kafkaTemplate2;

    public void send(String topic, Class2 c2) {
        kafkaTemplate2.send(topic, c2);
    }
}

// -----------------------------------------------
// Receiver
// -----------------------------------------------

@Configuration
@EnableKafka
public class ReceiverConfig {

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        ......
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        return props;
    }

    @Bean
    public ConsumerFactory<String, Class1> consumerFactory1() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(),
            new JsonDeserializer<>(Class1.class));
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Class1> kafkaListenerContainerFactory1() {
        ConcurrentKafkaListenerContainerFactory<String, Class1> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory1());
        return factory;
    }

    @Bean
    public Receiver1 receiver1() {
        return new Receiver1();
    }

    //-------- add the second listener

    @Bean
    public ConsumerFactory<String, Class2> consumerFactory2() {
    return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(),
            new JsonDeserializer<>(Class2.class));
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Class2> kafkaListenerContainerFactory2() {
        ConcurrentKafkaListenerContainerFactory<String, Class2> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory2());
        return factory;
    }

    @Bean
    public Receiver2 receiver2() {
        return new Receiver2();
    }
}

public class Receiver1 {
    @KafkaListener(id="listener1", topics = "topic1", containerFactory = "kafkaListenerContainerFactory1")
    public void receive(Class1 c1) {
        LOGGER.info("Received c1");
    }
}

public class Receiver2 {
    @KafkaListener(id="listener2", topics = "topic2", containerFactory = "kafkaListenerContainerFactory2")
    public void receive(Class2 c2) {
        LOGGER.info("Received c2");
    }
}
xoefb8l8

xoefb8l83#

您可以使用注解,只需要为每个注解使用不同的侦听器容器工厂。
框架将为每个注解创建一个侦听器容器。
你也可以在一个线程容器上听多个主题,但是它们会在一个线程上被处理。
看看我去年在springone平台上的演讲中的代码——你可能想看看app6,它展示了如何使用 MessageConverter 而不是反序列化程序,这可能有助于简化配置。

相关问题