我正在尝试构建一个spring引导应用程序,从kafka读取消息并将其放入activemq,反之亦然(从activemq读取并写入kafka),但我没有找到任何有用的教程来快速启动我的项目
w80xi6nr1#
请参见针对apachekafka的spring集成和spring集成扩展。使用入站和出站通道适配器
jms -> kafka kafka -> jms
Kafka连接在这个领域也有一些功能,但我不熟悉。编辑这个简单的spring boot应用程序显示了如何将数据从kafka传输到rabbitmq,反之亦然:
package com.example.demo; import org.apache.kafka.clients.admin.NewTopic; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.QueueBuilder; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.ApplicationRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.config.TopicBuilder; import org.springframework.kafka.core.KafkaTemplate; @SpringBootApplication public class So61069735Application { public static void main(String[] args) { SpringApplication.run(So61069735Application.class, args); } @Autowired private KafkaTemplate<String, String> kafkaTemplate; @Autowired private RabbitTemplate rabbitTemplate; @Bean public ApplicationRunner toKafka() { return args -> this.kafkaTemplate.send("so61069735-1", "foo"); } @KafkaListener(id = "so61069735-1", topics = "so61069735-1") public void listen1(String in) { System.out.println("From Kafka: " + in); this.rabbitTemplate.convertAndSend("so61069735-2", in.toUpperCase()); } @RabbitListener(queues = "so61069735-2") public void listen2(String in) { System.out.println("From Rabbit: " + in); this.kafkaTemplate.send("so61069735-3", in + in); } @KafkaListener(id = "so61069735-3", topics = "so61069735-3") public void listen(String in) { System.out.println("Final: " + in); } @Bean public NewTopic topic1() { return TopicBuilder.name("so61069735-1").partitions(1).replicas(1).build(); } @Bean public Queue queue() { return QueueBuilder.durable("so61069735-2").build(); } @Bean public NewTopic topic2() { return TopicBuilder.name("so61069735-3").partitions(1).replicas(1).build(); } }
spring.kafka.consumer.auto-offset-reset=earliest
结果
From Kafka: foo From Rabbit: FOO Final: FOOFOO
1条答案
按热度按时间w80xi6nr1#
请参见针对apachekafka的spring集成和spring集成扩展。
使用入站和出站通道适配器
Kafka连接在这个领域也有一些功能,但我不熟悉。
编辑
这个简单的spring boot应用程序显示了如何将数据从kafka传输到rabbitmq,反之亦然:
结果