Kafka 从Spring云流消息频道获取主题名称

pgky5nke  于 2022-12-22  发布在  Apache
关注(0)|答案(2)|浏览(158)

我们在Sping Boot 应用程序中使用Kafka Cloud Stream向Kafka发送数据。如下所示

producerChannel.send(MessageBuilder
                    .withPayload(data)
                    .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
                    .build())

我想知道除了直接从yaml文件阅读之外,是否可以从MessageChannel获取主题名称?

@Output("topic-name-out")
MessageChannel producerChannel();

主题名称出现在Kafka中

spring:
  cloud:
    stream:
      bindings:
        topic-name-out:
          destination: topic_name_to_producer
          contentType: application/json
          producer:
            partitionCount: ${partition_count:3}
qcbq4gxm

qcbq4gxm1#

可以创建Bean并将其绑定到生产者主题

@Bean
MessageChannel producerLogger(){
   return (message,l)->{
      RecordMetadata meta = message.getHeaders().get(KafkaHeaders.RECORD_METADATA, RecordMetadata.class);
            Object txnId = message.getHeaders().get("txnId");
            if (Objects.nonNull(meta) && Objects.nonNull(txnId) && txnId instanceof byte[]) {
                log.trace("Topic [{}] Partition [{}] Offset [{}] TxnId [{}]", meta.topic(), meta.partition(), meta.offset(),
                    new String((byte[]) txnId));
            }
    }

返回true;}
这将为您提供应用程序生成消息的主题、分区和偏移集。

mtb9vblg

mtb9vblg2#

我看到您正在使用基于注解的编程模型(例如,@Output)。它已经过时3年多了,正在从代码库中删除。请升级到函数模型。
至于你关于destination名称是外部配置的问题,有一种方法可以通过绑定以编程方式访问它,但我更好奇的是,为什么你需要它,因为它是一个内部细节,而且考虑到它是外部配置的,它可以在没有通知的情况下更改,从而影响你的代码。

相关问题