- 已关闭**。此问题为opinion-based。当前不接受答案。
- 想要改进此问题吗?**请更新此问题,以便editing this post可以用事实和引文来回答。
6小时前关门了。
Improve this question
我有一个通过grpc的双向频道
@Override
public StreamObserver<MessageReceiver.MessageRequest> biDirectionalMessageStream(StreamObserver<MessageReceiver.MessageResponse> responseObserver) {
return new StreamObserver<>() {
@Override
public void onNext(MessageReceiver.MessageRequest messageRequest) {
responseObserver.onNext(MessageReceiver.MessageResponse.newBuilder()
.setMessage("SomeMessage")
.build());
}
@Override
public void onError(Throwable throwable) {
responseObserver.onError(throwable);
}
@Override
public void onCompleted() {
responseObserver.onCompleted();
}
};
}
消息源是Kafka。本主题中的消息是针对不同的grpc连接的。在收到grpc消息时,连接可能尚未打开。如何在打开grpc连接时传输打开连接之前收到的所有消息,然后从Kafka实时传输这些消息?
我看到几个选项:
1.在内存中创建一个缓冲区,将来自kafka的消息读取到该缓冲区中,并在打开连接时将消息从缓冲区发送到grpc通道
1.建立持久存储(redis/mongo/postgres等)并使用它代替缓冲区
不确定哪个解决方案更好(如果是后者,不确定选择哪个存储)。负载配置文件-17,000 rpc
2条答案
按热度按时间tsm1rwdh1#
我理解你的问题,你把数据放在哪里并不重要,假设你不关心Kafka的保留策略,并适当地处理消耗的抵消。
例如,你将每个事件添加到一个数组列表中,然后提交这些偏移量,但是从来没有建立连接,你会丢失数据;或者,你确实建立了gRPC连接,但是只产生了一些数据(比如,你的数组列表最终导致了OOM),数据仍然丢失,并且可能在重新启动时被复制。
如果您可以完全控制发送初始请求的gRPC客户机,为什么不让它自己成为Kafka生产者,而不向某个“代理”发送gRPC请求来完成这项工作呢?
关于存储,您可以使用这些选项中的任何一个,但是Mongo或Postgres与Debezium的集成会更好,如果您想使用它来实际将数据导入Kafka的话。
bqf10yzr2#
使用基于拉的Kafka时,最好使用gRPC客户端从Kafka获取消息,因此它们之间不需要缓冲区。