下面是我从kafka消费的代码,它需要一个刷新令牌。我有一个函数返回一个jwt令牌。但如何让go客户端刷新?当我运行时,我没有得到错误,但我也没有得到流。
Created Consumer rdkafka#consumer-1
Ignored OAuthBearerTokenRefresh
我试着模仿这个链接https://github.com/confluentinc/confluent-kafka-go/blob/master/examples/oauthbearer_example/oauthbearer_example.go
import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"os"
"os/signal"
"reflect"
"syscall"
"github.com/confluentinc/confluent-kafka-go/kafka"
"golang.org/x/oauth2/clientcredentials"
)
func handleOAuthBearerTokenRefreshEvent(client kafka.Handle, e kafka.OAuthBearerTokenRefresh) {
oauthBearerToken, retrieveErr := retrieveUnsecuredToken(e)
if retrieveErr != nil {
fmt.Fprintf(os.Stderr, "%% Token retrieval error: %v\n", retrieveErr)
client.SetOAuthBearerTokenFailure(retrieveErr.Error())
} else {
setTokenError := client.SetOAuthBearerToken(oauthBearerToken)
if setTokenError != nil {
fmt.Fprintf(os.Stderr, "%% Error setting token and extensions: %v\n", setTokenError)
client.SetOAuthBearerTokenFailure(setTokenError.Error())
}
}
}
func retrieveUnsecuredToken(e kafka.OAuthBearerTokenRefresh) (kafka.OAuthBearerToken, error) {
//https://github.com/confluentinc/confluent-kafka-go/blob/master/examples/oauthbearer_example/oauthbearer_example.go
conf := &clientcredentials.Config{
ClientID: "test",
ClientSecret: "test",
TokenURL: "https://test.com/auth/realms/pro-realm/protocol/openid-connect/token",
}
token, _ := conf.Token(context.Background())
extensions := map[string]string{}
oauthBearerToken := kafka.OAuthBearerToken{
TokenValue: token.AccessToken,
Expiration: token.Expiry,
Principal: "principal=dude",
Extensions: extensions,
}
fmt.Println(oauthBearerToken)
return oauthBearerToken, nil
}
func main() {
/*
if len(os.Args) < 4 {
fmt.Fprintf(os.Stderr, "Usage: %s <broker> <group> <topics..>\n",
os.Args[0])
os.Exit(1)
}
*/
broker := os.Args[1]
group := os.Args[2]
topics := os.Args[3:]
sigchan := make(chan os.Signal, 1)
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
oauthConf := "principal=dude"
c, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": broker,
"security.protocol": "SASL_PLAINTEXT",
"sasl.mechanisms": "OAUTHBEARER",
"sasl.oauthbearer.config": oauthConf,
"broker.address.family": "v4",
"group.id": group,
"session.timeout.ms": 6000,
"enable.ssl.certificate.verification": false,
"auto.offset.reset": "earliest"})
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to create consumer: %s\n", err)
os.Exit(1)
}
fmt.Printf("Created Consumer %v\n", c)
err = c.SubscribeTopics(topics, nil)
run := true
go func(eventsChan chan kafka.Event) {
for ev := range eventsChan {
oart, ok := ev.(kafka.OAuthBearerTokenRefresh)
if !ok {
// Ignore other event types
continue
}
handleOAuthBearerTokenRefreshEvent(c, oart)
}
}(c.Events())
for run == true {
select {
case sig := <-sigchan:
fmt.Printf("Caught signal %v: terminating\n", sig)
run = false
default:
ev := c.Poll(100)
if ev == nil {
continue
}
switch e := ev.(type) {
case *kafka.Message:
fmt.Printf("%% Message on %s:\n%s\n", e.TopicPartition, string(e.Value))
if e.Headers != nil {
fmt.Printf("%% Headers: %v\n", e.Headers)
}
case kafka.Error:
fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e)
if e.Code() == kafka.ErrAllBrokersDown {
run = false
}
default:
fmt.Printf("Ignored %v\n", e)
}
}
}
fmt.Printf("Closing consumer\n")
c.Close()
}
更新:
如果我删除go func(eventschan chan kafka.event)并在投票中添加以下内容,if将起作用:
case kafka.OAuthBearerTokenRefresh:
oart, ok := ev.(kafka.OAuthBearerTokenRefresh)
fmt.Println(oart)
if !ok {
// Ignore other event types
continue
}
handleOAuthBearerTokenRefreshEvent(c, oart)
暂无答案!
目前还没有任何答案,快来回答吧!