如何为使用者添加oauthbearer支持-如何刷新jwt令牌

bvjxkvbb  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(210)

下面是我从kafka消费的代码,它需要一个刷新令牌。我有一个函数返回一个jwt令牌。但如何让go客户端刷新?当我运行时,我没有得到错误,但我也没有得到流。

Created Consumer rdkafka#consumer-1
Ignored OAuthBearerTokenRefresh

我试着模仿这个链接https://github.com/confluentinc/confluent-kafka-go/blob/master/examples/oauthbearer_example/oauthbearer_example.go

import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"os"
"os/signal"
"reflect"
"syscall"

"github.com/confluentinc/confluent-kafka-go/kafka"
"golang.org/x/oauth2/clientcredentials"
)

func handleOAuthBearerTokenRefreshEvent(client kafka.Handle, e kafka.OAuthBearerTokenRefresh) {
oauthBearerToken, retrieveErr := retrieveUnsecuredToken(e)
if retrieveErr != nil {
    fmt.Fprintf(os.Stderr, "%% Token retrieval error: %v\n", retrieveErr)
    client.SetOAuthBearerTokenFailure(retrieveErr.Error())
} else {
    setTokenError := client.SetOAuthBearerToken(oauthBearerToken)
    if setTokenError != nil {
        fmt.Fprintf(os.Stderr, "%% Error setting token and extensions: %v\n", setTokenError)
        client.SetOAuthBearerTokenFailure(setTokenError.Error())
    }
}
}

    func retrieveUnsecuredToken(e kafka.OAuthBearerTokenRefresh) (kafka.OAuthBearerToken, error) {

    //https://github.com/confluentinc/confluent-kafka-go/blob/master/examples/oauthbearer_example/oauthbearer_example.go

    conf := &clientcredentials.Config{
        ClientID:     "test",
        ClientSecret: "test",
        TokenURL:     "https://test.com/auth/realms/pro-realm/protocol/openid-connect/token",
    }
    token, _ := conf.Token(context.Background())
    extensions := map[string]string{}
    oauthBearerToken := kafka.OAuthBearerToken{
        TokenValue: token.AccessToken,
        Expiration: token.Expiry,
        Principal:  "principal=dude",
        Extensions: extensions,
    }
    fmt.Println(oauthBearerToken)
    return oauthBearerToken, nil
}

func main() {

    /*
        if len(os.Args) < 4 {
            fmt.Fprintf(os.Stderr, "Usage: %s <broker> <group> <topics..>\n",
                os.Args[0])
            os.Exit(1)
        }

    */

    broker := os.Args[1]
    group := os.Args[2]
    topics := os.Args[3:]
    sigchan := make(chan os.Signal, 1)
    signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)

    oauthConf := "principal=dude"

    c, err := kafka.NewConsumer(&kafka.ConfigMap{
        "bootstrap.servers":                   broker,
        "security.protocol":                   "SASL_PLAINTEXT",
        "sasl.mechanisms":                     "OAUTHBEARER",
        "sasl.oauthbearer.config":             oauthConf,
        "broker.address.family":               "v4",
        "group.id":                            group,
        "session.timeout.ms":                  6000,
        "enable.ssl.certificate.verification": false,
        "auto.offset.reset":                   "earliest"})

    if err != nil {
        fmt.Fprintf(os.Stderr, "Failed to create consumer: %s\n", err)
        os.Exit(1)
    }
    fmt.Printf("Created Consumer %v\n", c)
    err = c.SubscribeTopics(topics, nil)
    run := true

go func(eventsChan chan kafka.Event) {
    for ev := range eventsChan {
        oart, ok := ev.(kafka.OAuthBearerTokenRefresh)
        if !ok {
            // Ignore other event types
            continue
        }

        handleOAuthBearerTokenRefreshEvent(c, oart)
    }
}(c.Events())

    for run == true {
        select {
        case sig := <-sigchan:
            fmt.Printf("Caught signal %v: terminating\n", sig)
            run = false
        default:
            ev := c.Poll(100)
            if ev == nil {
                continue
            }
            switch e := ev.(type) {
            case *kafka.Message:
                fmt.Printf("%% Message on %s:\n%s\n", e.TopicPartition, string(e.Value))
                if e.Headers != nil {
                    fmt.Printf("%% Headers: %v\n", e.Headers)
                }
            case kafka.Error:
                fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e)
                if e.Code() == kafka.ErrAllBrokersDown {
                    run = false
                }
            default:
                fmt.Printf("Ignored %v\n", e)
            }
        }
    }

    fmt.Printf("Closing consumer\n")
    c.Close()

}

更新:
如果我删除go func(eventschan chan kafka.event)并在投票中添加以下内容,if将起作用:

case kafka.OAuthBearerTokenRefresh:
            oart, ok := ev.(kafka.OAuthBearerTokenRefresh)
            fmt.Println(oart)
            if !ok {
                // Ignore other event types
                continue
            }
            handleOAuthBearerTokenRefreshEvent(c, oart)

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题