quarkus中有没有向kafka发送消息的函数

sycxhyv7  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(473)

我是kafka和quarkus的新手,我想在处理用户请求时向kafka主题发送消息。
我已经阅读了quarkus quickstart中提供的kafka示例。我试过用Kafka消息

// when GET called send message to topic
@GET
@Produces(MediaType.TEXT_PLAIN)
public String hello() {
    generateSingle();
    return "hello";
}

@Outgoing("single-stations")
public KafkaMessage<Integer, String> generateSingle() {
    return KafkaMessage.of(1, "value");
};

但我得到的结果是,给Kafka的留言主题不断。
我想知道有没有其他方法或者我的代码有什么问题。
感谢您的帮助

f5emj3cl

f5emj3cl1#

目前关于此主题的文档非常简洁且不完整(quarkus 0.25.0)。我设法做到了,但它需要大量的实验和一些我相信是一个黑客,希望将在quarkus的更高版本的补救。
原则是 @Outgoing 方法必须生成由外部控制的流。这是通过创建流来完成的 Flowable.create() 在一个 @PostConstruct 方法,并将发射器公开给类成员。这个 @Outgoing 方法只返回已构造的流。
以下组件公开了一个公共方法, produce(String message) 将向Kafka发送短信:

package ...

import java.util.UUID;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.reactivestreams.Publisher;

import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.FlowableEmitter;
import io.smallrye.reactive.messaging.kafka.KafkaMessage;

@ApplicationScoped
public class KafkaController {

    private FlowableEmitter<KafkaMessage<String, String>> emitter;

    private Flowable<KafkaMessage<String, String>> outgoingStream;

    @PostConstruct
    void init() {
        outgoingStream = Flowable.create(emitter -> this.emitter = emitter, BackpressureStrategy.BUFFER);
    }

    public void produce(String message) {
        emitter.onNext(KafkaMessage.of(UUID.randomUUID().toString(), message));
    }

    @PreDestroy
    void dispose() {
        emitter.onComplete();
    }

    @Outgoing("internal")
    Publisher<KafkaMessage<String, String>> produceKafkaMessage() {
        return outgoingStream;
    }

    @Incoming("internal")
    @Outgoing("kafka-test")
    KafkaMessage<String, String> transform(Message<KafkaMessage<String, String>> arg) {
        return arg.getPayload();
    }
}

我在生成的quarkus应用程序中创建了这个类,如下所述:

mvn io.quarkus:quarkus-maven-plugin:0.25.0:create \
    -DprojectGroupId=org.acme \
    -DprojectArtifactId=kafka-quickstart \
    -Dextensions="kafka"

和配置( application.properties )具体如下:

kafka.bootstrap.servers=localhost:9092

mp.messaging.outgoing.kafka-test.connector=smallrye-kafka
mp.messaging.outgoing.kafka-test.topic=test
mp.messaging.outgoing.kafka-test.key.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.kafka-test.value.serializer=org.apache.kafka.common.serialization.StringSerializer

kafka示例的启动完全如快速启动中所述。你可以看电视 test 控制台侦听器的主题如下:

./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
    --topic test --from-beginning --group test-console.consumer

为了测试它,您可以创建一个jax-rs资源来调用它 produce() :

package ...

import javax.inject.Inject;
import javax.ws.rs.POST;
import javax.ws.rs.Path;

@Path("/control")
public class KafkaProduceControlResource {

    @Inject
    KafkaController kafkaController;

    @POST
    @Path("/produce")
    public void produceMessage(String message) {
        kafkaController.produce(message);
    }
}

从命令行调用它,如下所示,并观察控制台使用者:

curl -i -s -X POST -d "A text message" \
    http://localhost:8080/control/produce

黑客:看来 produceKafkaMessage()@Outgoing("kafka-test") 失败,因为quarkus不明白 KafkaMessage 是一个 Message ,并将其 Package 为一个,从而导致序列化错误。我正在用 "internal" 溪流。

相关问题