将cdi事件桥接到文件响应式消息代理

rggaifut  于 2021-06-06  发布在  Kafka
关注(0)|答案(0)|浏览(273)

我遵循quarkus-使用带有React式消息传递的apache kafka创建一个示例来体验它,我将消息流更改为:
保存帖子后,通过cdi触发一个事件。
收到cdi并发送给Kafka主题。
从kafka主题中读取数据,并将其作为sse公开给客户端。
Kafka消息的配置,属于 application.properties .


# Consume data from Kafka

mp.messaging.incoming.activities.connector=smallrye-kafka
mp.messaging.incoming.activities.value.deserializer=io.vertx.kafka.client.serialization.JsonObjectDeserializer

# Produce data to Kafka

mp.messaging.outgoing.activitiesOut.connector=smallrye-kafka
mp.messaging.outgoing.activitiesOut.topic=activities
mp.messaging.outgoing.activitiesOut.value.serializer=io.vertx.kafka.client.serialization.JsonObjectSerializer

cdi事件和React消息的事件处理类。

@ApplicationScoped
public class ActivityStreams {

    ReplaySubject<JsonObject> replaySubject;
    Flowable<JsonObject> flowable;

    @PostConstruct public void init() {
        replaySubject = ReplaySubject.create();
        flowable = replaySubject.share().toFlowable(BackpressureStrategy.BUFFER);
    }

    public void onActivityCreated(@ObservesAsync Activity activity) {
        replaySubject.onNext(JsonObject.mapFrom(activity));
    }

    @Outgoing("activitiesOut")
    public Publisher<JsonObject> onReceivedActivityCreated() {
        return flowable;
    }

    @Incoming("activities")
    @Outgoing("my-data-stream")
    @Broadcast
    public Activity onActivityReceived(JsonObject data) {
        Activity activity = data.mapTo(Activity.class);
        activity.setOccurred(LocalDateTime.now());
        return activity;
    }

}

当我试图将其公开为sse时,它并没有按预期工作。

@Path("/activities")
@ApplicationScoped
public class ActivityResource {

    @Inject
    @Channel("my-data-stream")
    public Publisher<Activity> stream;

    @GET
    @Produces(MediaType.SERVER_SENT_EVENTS)
    @SseElementType(MediaType.APPLICATION_JSON)
    Publisher<Activity> eventStream(){
        return stream;
    }
}

在控制台日志中,我看到了发送到活动队列的消息,但没有进一步的步骤。当我通过 curl ,它总是返回未找到状态。

curl -v -N  -H "Accept:text/event-stream" http://localhost:8080/activities  --connect-timeout 60

...
HTTP/1.1 404 Not Found

完整的示例代码在这里。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题