spring云流kafka生产者消息

vwhgwdsa  于 2021-06-08  发布在  Kafka
关注(0)|答案(2)|浏览(449)

我想建立一个Spring云流Kafka生产者与 Spring 启动。
生产者正在工作,我可以使用来自kafka代理的消息,但是消息还包含一些头信息,如以下信息:

contentType   "text/plain"originalContentType    "application/json;charset=UTF-8"{"message":"hello"}

我的pojo包含一个字段(字符串消息),所以我希望只有json字符串会被发送到kafka。
restcontroller中的方法test()触发生产者:

@EnableBinding(ProducerChannels.class)
@SpringBootApplication
@RestController
public class KafkaStreamProducerApplication {

private MessageChannel consumer;

public KafkaStreamProducerApplication(ProducerChannels channels) {
    this.consumer = channels.consumer();
}

@PostMapping("/test/{message}")
public void test(@PathVariable String message) {
    Message<MyMessage> msg = MessageBuilder.withPayload(new MyMessage(message)).build();
    this.consumer.send(msg);
}

interface ProducerChannels {

    @Output
    MessageChannel consumer();
}

我的应用程序.properties

spring.cloud.stream.bindings.consumer.destination=consumer
spring.cloud.stream.bindings.consumer.content-type=application/json

如果您能推荐任何关于这个主题的文档或示例,我也将不胜感激。github上的例子通常都很简单,它们使用了很多自动配置,没有任何解释。我使用的例子是rabbitmq。

hsgswve4

hsgswve41#

这个 contentType 以及 originalContentType 当消费者应用程序反序列化消息并基于内容类型集执行消息转换时,spring cloud stream使用头。
这个 contentType 只有当您像这里那样配置绑定的内容类型时,才会显式设置头 spring.cloud.stream.bindings.consumer.content-type=application/json . 当 contentType header设置后,spring cloud stream使用 originalContentType 在序列化/反序列化过程中(通过绑定器)向代理生成/使用来自代理的消息的标志。
在你的情况下,我想你可能不需要设置 contentType 完全。
除了spring云流示例github repo中的示例之外,您还可以参考现成的应用程序启动程序,它涵盖了可以针对任何受支持的绑定(包括kafka)运行的各种应用程序。

55ooxyrt

55ooxyrt2#

如果你想避免嵌入消息头(这样你就可以在一些非spring的云流应用程序中接收消息),设置生产者的 headerModeraw .
请参见生产者属性。
头模
当设置为raw时,禁用输出上的头嵌入。仅对本机不支持消息头并且需要嵌入消息头的消息传递中间件有效。在为非spring云流应用程序生成数据时非常有用。
默认值:embeddedheaders。

相关问题