Spring Kafka整合

u7up0aaq  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(372)

我必须把Kafka和古巴结合起来,我想这就像增加对Kafka的依赖和创建一个 Configuration 带注解的类来初始化kafka消费者,因为cuba是基于spring的。
当我添加一个配置时,我发现它在cuba启动时没有被扫描。当我切换到cuba视图时,我注意到只有那些被注解为 Service 或者 Component 将被读取。但是,即使我加上 Component 类,它仍然没有被正确扫描(我添加了一个用 @Value 它查找一个不存在的属性,但在启动它时cuba没有抛出任何错误)

zdwk9cvp

zdwk9cvp1#

古巴+Kafka一体化有一个简单的例子,你可以在这里找到:https://github.com/cuba-labs/kafka-sample
配置过程取自spring官方文档。
关键配置类是com.company.kafkasample.config.kafkanconfig。它包含许多bean,可以帮助您配置kafka设施。在这个特定的示例中,同时配置了生产者和消费者。请注意,配置参数是硬编码的,但这只是一个示例。

@Bean
    public ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "sample-kafka");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }

之后,您应该能够创建一个服务,通过注入kafkatemplatebean将消息发送到kafka队列。

@Inject
    private KafkaTemplate<Integer, String> template;

    @Override
    public void sendMessage(String message) {
        log.info("Sending {} using Kafka", message);
        long id = uniqueNumbersService.getNextNumber("users");
        ListenableFuture<SendResult<Integer, String>> send = template.send("users", (int) id, message);
        send.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() {
            @Override
            public void onFailure(Throwable ex) {
               log.info("Failed to send message {}, error {}", message, ex.getMessage());
            }

            @Override
            public void onSuccess(SendResult<Integer, String> result) {
                log.info("Message {} sent", message);
            }
        });
    }

然后您可以将此服务注入屏幕并在那里使用它。
对于接收器,您可以使用cuba组件并注解其方法 @KafkaListener 注解。例如,下面的示例将kafka消息保存到数据库中。

@Component
@DependsOn("consumerFactory")
public class MessageListener {

    @Inject
    private DataManager dataManager;

    @KafkaListener(id = "sample-kafka", topics = "users")
    public void listen1(String foo, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) int id) {
        KafkaMessage kafkaMessage = dataManager.create(KafkaMessage.class);
        kafkaMessage.setKafkaID(id);
        kafkaMessage.setContent(foo);
        dataManager.commit(kafkaMessage);
    }

相关问题