我想建立一个Spring云流Kafka生产者与 Spring 启动。
生产者正在工作,我可以使用来自kafka代理的消息,但是消息还包含一些头信息,如以下信息:
contentType "text/plain"originalContentType "application/json;charset=UTF-8"{"message":"hello"}
我的pojo包含一个字段(字符串消息),所以我希望只有json字符串会被发送到kafka。
restcontroller中的方法test()触发生产者:
@EnableBinding(ProducerChannels.class)
@SpringBootApplication
@RestController
public class KafkaStreamProducerApplication {
private MessageChannel consumer;
public KafkaStreamProducerApplication(ProducerChannels channels) {
this.consumer = channels.consumer();
}
@PostMapping("/test/{message}")
public void test(@PathVariable String message) {
Message<MyMessage> msg = MessageBuilder.withPayload(new MyMessage(message)).build();
this.consumer.send(msg);
}
interface ProducerChannels {
@Output
MessageChannel consumer();
}
我的应用程序.properties
spring.cloud.stream.bindings.consumer.destination=consumer
spring.cloud.stream.bindings.consumer.content-type=application/json
如果您能推荐任何关于这个主题的文档或示例,我也将不胜感激。github上的例子通常都很简单,它们使用了很多自动配置,没有任何解释。我使用的例子是rabbitmq。
2条答案
按热度按时间hsgswve41#
这个
contentType
以及originalContentType
当消费者应用程序反序列化消息并基于内容类型集执行消息转换时,spring cloud stream使用头。这个
contentType
只有当您像这里那样配置绑定的内容类型时,才会显式设置头spring.cloud.stream.bindings.consumer.content-type=application/json
. 当contentType
header设置后,spring cloud stream使用originalContentType
在序列化/反序列化过程中(通过绑定器)向代理生成/使用来自代理的消息的标志。在你的情况下,我想你可能不需要设置
contentType
完全。除了spring云流示例github repo中的示例之外,您还可以参考现成的应用程序启动程序,它涵盖了可以针对任何受支持的绑定(包括kafka)运行的各种应用程序。
55ooxyrt2#
如果你想避免嵌入消息头(这样你就可以在一些非spring的云流应用程序中接收消息),设置生产者的
headerMode
至raw
.请参见生产者属性。
头模
当设置为raw时,禁用输出上的头嵌入。仅对本机不支持消息头并且需要嵌入消息头的消息传递中间件有效。在为非spring云流应用程序生成数据时非常有用。
默认值:embeddedheaders。