我有一个springkafka微服务,我最近添加了一个死信来发送各种错误消息
//some code..
@Component
public class KafkaProducer {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendDeadLetter(String message) {
kafkaTemplate.send("myDeadLetter", message);
}
}
我想把《死信》的主题Kafka称为“消息主题”+“死信”,我的主要主题是“消息主题”。在my consumer中,主题名为application.yml,如下所示:
@KafkaListener(topics = "${spring.kafka.topic.name}")
如何通过可能插入application.yml中的“+deadletter”来设置相同的Kafka主题?我试过这样一件事:
@Component
@KafkaListener(topics = "${spring.kafka.topic.name}"+"_deadLetter")
public class KafkaProducer {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendDeadLetter(String message) {
kafkaTemplate.send("messageTopic_deadLetter", message);
}
}
但它给我创造了两个同名的不同主题。我在等你的建议,谢谢你的帮助!
3条答案
按热度按时间ohtdti5x1#
kafka listener接受常量作为主题名,我们不能在这里修改主题名。
对于实际主题和死信主题,最好使用单独的方法(kafka侦听器),在yaml中定义两个不同的属性来保存两个主题名。
从yml或属性文件中引用send(…)内的主题名
nle07wnf2#
可以使用spel构造主题名称:
请注意属性占位符和文字周围的单引号。
fslejnso3#
这个例子可能与你的用例无关,但是分享它以防对某人有所帮助。
如果您正在构建kafka流应用程序,则可以通过以下方式获得可变接收器主题名称:
在向sink主题生成时,传递一个lambda,该lambda将上下文作为参数,并传递将处理名称定义的方法。
实现生成接收器主题名称的方法:
上面的例子很简单,可以简化为
(k, v, ctx) -> ctx.topic().concat("_deadLetter")
,但我希望在需要进一步转换的情况下保留单独的方法方法,即当主题名称的一部分将被配置文件中定义的常量或regex替换时。