我有一个python kafka,它的代码是:
class TokenProvider(object):
def __init__(self,client_id,client_secret):
self.client_id = client_id
self.client_secret = client_secret
def token(self):
token_url = 'https://test.com/protocol/openid-connect/token'
client = BackendApplicationClient(client_id=self.client_id)
oauth = OAuth2Session(client=client)
token_json = oauth.fetch_token(token_url=token_url, client_id=self.client_id, client_secret=self.client_secret)
token = token_json['access_token']
#print(token)
return token
consumer = KafkaConsumer(
group_id=None,
bootstrap_servers=['test.com:9094'],
security_protocol='SASL_SSL',
sasl_mechanism='OAUTHBEARER',
sasl_oauth_token_provider=TokenProvider(client_id,client_secret),
ssl_check_hostname=False,
ssl_context=create_ssl_context(),
auto_offset_reset=offset,
enable_auto_commit=False,
value_deserializer=lambda m: decode(m)
)
consumer.subscribe(topics=['test.stream'])
我的合流python如下,我得到了这个错误
cimpl.KafkaException: KafkaError{code=_INVALID_ARG,val=-186,str="Property "oauthbearer_token_refresh_cb" must be set through dedicated .._set_..() function"}
c = Consumer({
'bootstrap.servers': 'test.com:9094',
'sasl.mechanism': 'OAUTHBEARER',
'security.protocol': 'SASL_SSL',
'oauthbearer_token_refresh_cb': TokenProvider(client_id,client_secret),
'group.id': str(uuid.uuid1()),
'auto.offset.reset': 'earliest'
})
c.subscribe(['test.stream'])
那么,如何让合流Kafka工作呢?我似乎对oauthbearer\u token\u refresh\u cb使用oauthbearer和sasl\u ssl有问题。
本质上,我用jwt令牌进行身份验证
2条答案
按热度按时间5hcedyr01#
根据文件https://github.com/edenhill/librdkafka/blob/master/configuration.md,的
oauthbearer_token_refresh_cb
必须使用设置选项rd_kafka_conf_set_oauthbearer_token_refresh_cb()
. 但是请注意,您正在尝试将其设置为TokenProvider
示例,这是不可调用的,因此您可能希望传递TokenProvider(...).token
.sasl/oauthbearer token refresh callback(用rd\u kafka\u conf\u set\u oauthbearer\u token\u refresh\u cb()设置),由rd\u kafka\u poll()等触发。当需要刷新客户端的oauthbearer令牌时,将触发此回调。
xghobddn2#
从源代码上看,python和go客户机还不支持oauthbearer