for {
select {
case <-ctx.Done():
return false
default:
if client.Conn.GetState() != connectivity.Ready {
client.Conn.Connect()
}
// reserve a short duration (customizable) for conn to change state from idle to ready if grpc server is up
time.Sleep(500 * time.Millisecond)
if client.Conn.GetState() == connectivity.Ready {
return true
}
// define reconnect time interval (backoff) or/and reconnect attempts here
time.Sleep(2 * time.Second)
}
}
for {
select {
case <-ctx.Done():
return
case <-reconnectCh:
if client.Conn.GetState() != connectivity.Ready && *isConnectedWebSocket {
if o.waitUntilReady(client, isConnectedWebSocket, ctx) {
err := o.generateNewProcessOrderStream(client, ctx)
if err != nil {
logger.Logger.Error("failed to establish stream connection to grpc server ...")
}
// re-listening server side streaming
go o.listenProcessOrderServerSide(client, reconnectCh, ctx, isConnectedWebSocket)
}
}
}
}
注意监听任务是由另一个goroutine并发处理的。
// listening server side streaming
go o.listenProcessOrderServerSide(client, reconnectCh, websocketCtx, isConnectedWebSocket)
1条答案
按热度按时间wljmcqd81#
当gRPC连接关闭时,gRPC客户端连接的状态将是
IDLE
或TRANSIENT_FAILURE
。下面是我在Go语言中为gRPC双向流定制的重新连接机制的示例。首先,我有一个for循环来保持重新连接,直到gRPC服务器启动,在调用conn.Connect()
后状态将变为ready。另外,为了执行重新连接任务,会生成一个goroutine,在重新连接成功后,它会生成另一个goroutine来监听gRPC服务器。
注意监听任务是由另一个goroutine并发处理的。
你可以看看我的代码示例here,希望能有所帮助。