使用quarkus将消息从kafka流发送到websocket

b1uwtaje  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(550)

所以我不知道怎么做。我已经使用quarkus和MicrofileReact式消息传递框架以及javax.websocket库的东西完成了这项工作,但我不确定如何使用kafka流将其移植到。使用mp被动消息传递,我可以在我的其他类中的某个通道上有一个@outgoing注解,然后使用websocket服务,我可以像这样从该通道注入。

@ServerEndpoint("/validatedmessages")
@ApplicationScoped
public class WebSocket {

@Inject @Channel("post-final-check") Flowable<CustomMessage> finalizedMessages;
//private Jsonb jsonb;
ObjectMapper obj = new ObjectMapper(); 

private static final Logger LOGGER = Logger.getLogger(WebSocket.class);

private List<Session> sessions = new CopyOnWriteArrayList<>();
private Disposable subscription;

@OnOpen
public void onOpen(Session session) {
    sessions.add(session);
}

@OnClose
public void onClose(Session session) {
    sessions.remove(session);
}

@PostConstruct
public void subscribe() {
    subscription = finalizedMessages.subscribe(message -> sessions.forEach(session -> {
        try {
            write(session, message);
        } catch (JsonProcessingException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }));
}

@PreDestroy
public void cleanup() throws Exception {
    subscription.dispose();
    //jsonb.close();
}

有没有可能用Kafka来做这个?

sqxo8psd

sqxo8psd1#

Kafka流的输出存储在哪里?主题还是表格?
如果是第一个,那么可以插入发布者或使用@incoming(topic)。否则您需要使用interactivequeries,请参见此处的示例:
https://quarkus.io/guides/kafka-streams#interactive-查询

相关问题