我是kafka和quarkus的新手,我想在处理用户请求时向kafka主题发送消息。
我已经阅读了quarkus quickstart中提供的kafka示例。我试过用Kafka消息
// when GET called send message to topic
@GET
@Produces(MediaType.TEXT_PLAIN)
public String hello() {
generateSingle();
return "hello";
}
@Outgoing("single-stations")
public KafkaMessage<Integer, String> generateSingle() {
return KafkaMessage.of(1, "value");
};
但我得到的结果是,给Kafka的留言主题不断。
我想知道有没有其他方法或者我的代码有什么问题。
感谢您的帮助
1条答案
按热度按时间f5emj3cl1#
目前关于此主题的文档非常简洁且不完整(quarkus 0.25.0)。我设法做到了,但它需要大量的实验和一些我相信是一个黑客,希望将在quarkus的更高版本的补救。
原则是
@Outgoing
方法必须生成由外部控制的流。这是通过创建流来完成的Flowable.create()
在一个@PostConstruct
方法,并将发射器公开给类成员。这个@Outgoing
方法只返回已构造的流。以下组件公开了一个公共方法,
produce(String message)
将向Kafka发送短信:我在生成的quarkus应用程序中创建了这个类,如下所述:
和配置(
application.properties
)具体如下:kafka示例的启动完全如快速启动中所述。你可以看电视
test
控制台侦听器的主题如下:为了测试它,您可以创建一个jax-rs资源来调用它
produce()
:从命令行调用它,如下所示,并观察控制台使用者:
黑客:看来
produceKafkaMessage()
与@Outgoing("kafka-test")
失败,因为quarkus不明白KafkaMessage
是一个Message
,并将其 Package 为一个,从而导致序列化错误。我正在用"internal"
溪流。