我遵循quarkus-使用带有React式消息传递的apache kafka创建一个示例来体验它,我将消息流更改为:
保存帖子后,通过cdi触发一个事件。
收到cdi并发送给Kafka主题。
从kafka主题中读取数据,并将其作为sse公开给客户端。
Kafka消息的配置,属于 application.properties
.
# Consume data from Kafka
mp.messaging.incoming.activities.connector=smallrye-kafka
mp.messaging.incoming.activities.value.deserializer=io.vertx.kafka.client.serialization.JsonObjectDeserializer
# Produce data to Kafka
mp.messaging.outgoing.activitiesOut.connector=smallrye-kafka
mp.messaging.outgoing.activitiesOut.topic=activities
mp.messaging.outgoing.activitiesOut.value.serializer=io.vertx.kafka.client.serialization.JsonObjectSerializer
cdi事件和React消息的事件处理类。
@ApplicationScoped
public class ActivityStreams {
ReplaySubject<JsonObject> replaySubject;
Flowable<JsonObject> flowable;
@PostConstruct public void init() {
replaySubject = ReplaySubject.create();
flowable = replaySubject.share().toFlowable(BackpressureStrategy.BUFFER);
}
public void onActivityCreated(@ObservesAsync Activity activity) {
replaySubject.onNext(JsonObject.mapFrom(activity));
}
@Outgoing("activitiesOut")
public Publisher<JsonObject> onReceivedActivityCreated() {
return flowable;
}
@Incoming("activities")
@Outgoing("my-data-stream")
@Broadcast
public Activity onActivityReceived(JsonObject data) {
Activity activity = data.mapTo(Activity.class);
activity.setOccurred(LocalDateTime.now());
return activity;
}
}
当我试图将其公开为sse时,它并没有按预期工作。
@Path("/activities")
@ApplicationScoped
public class ActivityResource {
@Inject
@Channel("my-data-stream")
public Publisher<Activity> stream;
@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
@SseElementType(MediaType.APPLICATION_JSON)
Publisher<Activity> eventStream(){
return stream;
}
}
在控制台日志中,我看到了发送到活动队列的消息,但没有进一步的步骤。当我通过 curl
,它总是返回未找到状态。
curl -v -N -H "Accept:text/event-stream" http://localhost:8080/activities --connect-timeout 60
...
HTTP/1.1 404 Not Found
完整的示例代码在这里。
暂无答案!
目前还没有任何答案,快来回答吧!