这是我的代码,我试图从谷歌酒吧子阅读消息
func (p *PubSubSource) Read(_ context.Context, readRequest sourcesdk.ReadRequest, messageCh chan<- sourcesdk.Message) {
if len(p.messages) > 0 {
return
}
log.Println("Request timeout", readRequest.TimeOut())
ctx, cancelFunc := context.WithTimeout(context.Background(), readRequest.TimeOut())
defer cancelFunc()
cctx, cancel := context.WithCancel(ctx)
receiveMesg := make(chan *pubsub.Message)
p.subscription.ReceiveSettings.MaxOutstandingMessages = int(readRequest.Count())
go p.subscription.Receive(cctx, func(_ context.Context, msg *pubsub.Message) {
fmt.Printf("Got message------------------: %s\n", string(msg.Data))
receiveMesg <- msg
})
for {
select {
case <-ctx.Done():
log.Println("Timeout done ------------************************")
cancel()
return
case msg := <-receiveMesg:
log.Println("Executing Loop--------")
p.lock.Lock()
messageCh <- sourcesdk.NewMessage(
msg.Data,
sourcesdk.NewOffset([]byte(msg.ID), "0"),
msg.PublishTime,
)
p.messages[msg.ID] = msg
p.lock.Unlock()
default:
continue
}
}
}
字符串
问题是case <-ctx.Done()
:never gets executed,why?.我想在超时结束时退出父函数,但ctx.Done()
从未被调用,我这样做对吗?有更好的方法来实现它吗?
1条答案
按热度按时间bjp0bcyl1#
您正在将
cctx
传递给p.subscription.Receive
,而不是ctx
。当取消子上下文时,父上下文将不会取消。你必须检查
cctx
是否被取消。字符串