java 从Kafka到grpc的真实的信息生成[已关闭]

e5njpo68  于 2023-02-18  发布在  Java
关注(0)|答案(2)|浏览(137)
    • 已关闭**。此问题为opinion-based。当前不接受答案。
    • 想要改进此问题吗?**请更新此问题,以便editing this post可以用事实和引文来回答。

6小时前关门了。
Improve this question
我有一个通过grpc的双向频道

@Override
public StreamObserver<MessageReceiver.MessageRequest> biDirectionalMessageStream(StreamObserver<MessageReceiver.MessageResponse> responseObserver) {
    return new StreamObserver<>() {
        @Override
        public void onNext(MessageReceiver.MessageRequest messageRequest) {
            responseObserver.onNext(MessageReceiver.MessageResponse.newBuilder()
                    .setMessage("SomeMessage")
                    .build());
        }

        @Override
        public void onError(Throwable throwable) {
            responseObserver.onError(throwable);
        }

        @Override
        public void onCompleted() {
            responseObserver.onCompleted();
        }
    };
}

消息源是Kafka。本主题中的消息是针对不同的grpc连接的。在收到grpc消息时,连接可能尚未打开。如何在打开grpc连接时传输打开连接之前收到的所有消息,然后从Kafka实时传输这些消息?
我看到几个选项:
1.在内存中创建一个缓冲区,将来自kafka的消息读取到该缓冲区中,并在打开连接时将消息从缓冲区发送到grpc通道
1.建立持久存储(redis/mongo/postgres等)并使用它代替缓冲区
不确定哪个解决方案更好(如果是后者,不确定选择哪个存储)。负载配置文件-17,000 rpc

tsm1rwdh

tsm1rwdh1#

我理解你的问题,你把数据放在哪里并不重要,假设你不关心Kafka的保留策略,并适当地处理消耗的抵消。
例如,你将每个事件添加到一个数组列表中,然后提交这些偏移量,但是从来没有建立连接,你会丢失数据;或者,你确实建立了gRPC连接,但是只产生了一些数据(比如,你的数组列表最终导致了OOM),数据仍然丢失,并且可能在重新启动时被复制。
如果您可以完全控制发送初始请求的gRPC客户机,为什么不让它自己成为Kafka生产者,而不向某个“代理”发送gRPC请求来完成这项工作呢?
关于存储,您可以使用这些选项中的任何一个,但是Mongo或Postgres与Debezium的集成会更好,如果您想使用它来实际将数据导入Kafka的话。

bqf10yzr

bqf10yzr2#

使用基于拉的Kafka时,最好使用gRPC客户端从Kafka获取消息,因此它们之间不需要缓冲区。

相关问题