将kafka与activemq集成的spring boot应用程序

nr9pn0ug  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(363)

我正在尝试构建一个spring引导应用程序,从kafka读取消息并将其放入activemq,反之亦然(从activemq读取并写入kafka),但我没有找到任何有用的教程来快速启动我的项目

w80xi6nr

w80xi6nr1#

请参见针对apachekafka的spring集成和spring集成扩展。
使用入站和出站通道适配器

jms -> kafka

kafka -> jms

Kafka连接在这个领域也有一些功能,但我不熟悉。
编辑
这个简单的spring boot应用程序显示了如何将数据从kafka传输到rabbitmq,反之亦然:

package com.example.demo;

import org.apache.kafka.clients.admin.NewTopic;

import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.config.TopicBuilder;
import org.springframework.kafka.core.KafkaTemplate;

@SpringBootApplication
public class So61069735Application {

    public static void main(String[] args) {
        SpringApplication.run(So61069735Application.class, args);
    }

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Bean
    public ApplicationRunner toKafka() {
        return args -> this.kafkaTemplate.send("so61069735-1", "foo");
    }

    @KafkaListener(id = "so61069735-1", topics = "so61069735-1")
    public void listen1(String in) {
        System.out.println("From Kafka: " + in);
        this.rabbitTemplate.convertAndSend("so61069735-2", in.toUpperCase());
    }

    @RabbitListener(queues = "so61069735-2")
    public void listen2(String in) {
        System.out.println("From Rabbit: " + in);
        this.kafkaTemplate.send("so61069735-3", in + in);
    }

    @KafkaListener(id = "so61069735-3", topics = "so61069735-3")
    public void listen(String in) {
        System.out.println("Final: " + in);
    }

    @Bean
    public NewTopic topic1() {
        return TopicBuilder.name("so61069735-1").partitions(1).replicas(1).build();
    }

    @Bean
    public Queue queue() {
        return QueueBuilder.durable("so61069735-2").build();
    }

    @Bean
    public NewTopic topic2() {
        return TopicBuilder.name("so61069735-3").partitions(1).replicas(1).build();
    }

}
spring.kafka.consumer.auto-offset-reset=earliest

结果

From Kafka: foo
From Rabbit: FOO
Final: FOOFOO

相关问题