spring kafka:如何在producer kafka中插入application.yml主题

wooyq4lh  于 2021-07-13  发布在  Java
关注(0)|答案(3)|浏览(344)

我有一个springkafka微服务,我最近添加了一个死信来发送各种错误消息

//some code..
@Component
public class KafkaProducer {
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendDeadLetter(String message) {
        kafkaTemplate.send("myDeadLetter", message);
    }
}

我想把《死信》的主题Kafka称为“消息主题”+“死信”,我的主要主题是“消息主题”。在my consumer中,主题名为application.yml,如下所示:

@KafkaListener(topics = "${spring.kafka.topic.name}")

如何通过可能插入application.yml中的“+deadletter”来设置相同的Kafka主题?我试过这样一件事:

@Component
@KafkaListener(topics = "${spring.kafka.topic.name}"+"_deadLetter")
public class KafkaProducer {
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendDeadLetter(String message) {
        kafkaTemplate.send("messageTopic_deadLetter", message);
    }
}

但它给我创造了两个同名的不同主题。我在等你的建议,谢谢你的帮助!

ohtdti5x

ohtdti5x1#

kafka listener接受常量作为主题名,我们不能在这里修改主题名。
对于实际主题和死信主题,最好使用单独的方法(kafka侦听器),在yaml中定义两个不同的属性来保存两个主题名。

@KafkaListener(topics = "${spring.kafka.topic.name}")
public void listen(......){

}

@KafkaListener(topics = "${spring.kafka.deadletter.topic.name}")
public void listenDlt(......){

}


从yml或属性文件中引用send(…)内的主题名

@Component
@KafkaListener(topics = "${spring.kafka.deadletter.topic.name}")
public class KafkaProducer {

    @Value("${spring.kafka.deadletter.topic.name}")
    private String DLT_TOPIC_NAME;

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendDeadLetter(String message) {
        kafkaTemplate.send(DLT_TOPIC_NAME, message);
    }
}
nle07wnf

nle07wnf2#

可以使用spel构造主题名称:

@KafkaListener(topics = "#{'${spring.kafka.topic.name}' + '_deadLetter'"})

请注意属性占位符和文字周围的单引号。

fslejnso

fslejnso3#

这个例子可能与你的用例无关,但是分享它以防对某人有所帮助。
如果您正在构建kafka流应用程序,则可以通过以下方式获得可变接收器主题名称:
在向sink主题生成时,传递一个lambda,该lambda将上下文作为参数,并传递将处理名称定义的方法。

... /* precedent stream operations */
        // terminal operation 'to'. 
        .to(
            (k, v, ctx) -> sinkTopicNameGenerator(ctx),
            Produced.with(Serdes, Serdes)
        );

实现生成接收器主题名称的方法:

protected static String sinkTopicNameGenerator(RecordContext ctx) {
    return ctx.topic().concat("_deadLetter");
  }

上面的例子很简单,可以简化为 (k, v, ctx) -> ctx.topic().concat("_deadLetter") ,但我希望在需要进一步转换的情况下保留单独的方法方法,即当主题名称的一部分将被配置文件中定义的常量或regex替换时。

相关问题