Kafka问题主题\u授权\u失败

4sup72z8  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(578)

我正在尝试设置Kafka授权本地使用密钥斗篷。当我试图生成一条关于某个主题的消息时,它会显示“topic\u authorization\u failed”。已经试过了,但运气不好。以下是我的配置文件:

服务器属性


########## SECURITY using OAUTHBEARER authentication ###############

sasl.enabled.mechanisms=OAUTHBEARER
sasl.mechanism.inter.broker.protocol=OAUTHBEARER
security.inter.broker.protocol=SASL_PLAINTEXT
listeners=SASL_PLAINTEXT://localhost:9093
advertised.listeners=SASL_PLAINTEXT://localhost:9093

# Authorizer for ACL

authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
super.users=User:kafkabroker;

################ OAuth Classes #####################

sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required OAUTH_LOGIN_SERVER=<auth-server-url> OAUTH_LOGIN_ENDPOINT='/oauth2/default/v1/token' OAUTH_LOGIN_GRANT_TYPE=client_credentials OAUTH_LOGIN_SCOPE=broker.kafka OAUTH_AUTHORIZATION='Basic <encoded-clientId:clientsecret>' OAUTH_INTROSPECT_SERVER=<auth-server-url> OAUTH_INTROSPECT_ENDPOINT='/oauth2/default/v1/introspect' OAUTH_INTROSPECT_AUTHORIZATION='Basic <encoded-clientId:clientsecret>';
listener.name.sasl_plaintext.oauthbearer.sasl.login.callback.handler.class=com.oauth2.security.oauthbearer.OAuthAuthenticateLoginCallbackHandler
listener.name.sasl_plaintext.oauthbearer.sasl.server.callback.handler.class=com.oauth2.security.oauthbearer.OAuthAuthenticateValidatorCallbackHandler

########## SECURITY using OAUTHBEARER authentication ###############

auto.create.topics.enable=true

Kafka\服务器\ jaas.conf

KafkaServer {
    org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required
    LoginStringClaim_sub="kafkabroker";
};

kafkastart.sh(导出环境变量并启动kafka服务器)

export KAFKA_OPTS="
-Djava.security.auth.login.config=/usr/local/kafka/config/kafka_server_jaas.conf 
-DOAUTH_WITH_SSL=true 
-DOAUTH_LOGIN_SERVER=<keycloak-server>
-DOAUTH_LOGIN_ENDPOINT=/auth/realms/<realm>/protocol/openid-connect/token  
-DOAUTH_LOGIN_GRANT_TYPE=client_credentials 
-DOAUTH_LOGIN_SCOPE=kafka 
-DOAUTH_INTROSPECT_SERVER=<keycloak-server>  
-DOAUTH_INTROSPECT_ENDPOINT=/auth/realms/<realm>/protocol/openid-connect/token/introspect 
-DOAUTH_AUTHORIZATION=Basic%20<encoded-clientId:clientsecret> 
-DOAUTH_INTROSPECT_AUTHORIZATION=Basic%20<encoded-clientId:clientsecret>"
./bin/kafka-server-start.sh ./config/server.properties

producer-config.properties属性

security.protocol=SASL_PLAINTEXT
sasl.mechanism=OAUTHBEARER
sasl.login.callback.handler.class=com.oauth2.security.oauthbearer.OAuthAuthenticateLoginCallbackHandler
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required OAUTH_LOGIN_SERVER=<keycloak-server> OAUTH_LOGIN_ENDPOINT='/auth/realms/<realm>/protocol/openid-connect/token' OAUTH_LOGIN_GRANT_TYPE=client_credentials OAUTH_LOGIN_SCOPE=kafka OAUTH_AUTHORIZATION='Basic <encoded client_id:client_secret>' OAUTH_INTROSPECT_SERVER=<keycloak-server> OAUTH_INTROSPECT_ENDPOINT='/auth/realms/<realm>/protocol/openid-connect/token/introspect' OAUTH_INTROSPECT_AUTHORIZATION='Basic <encoded client_id:client_secret>';

Kafka日志:

[2020-11-30 16:16:19,933] INFO [SocketServer brokerId=0] Started socket server acceptors and processors (kafka.network.SocketServer)
[2020-11-30 16:16:19,934] INFO Kafka version: 2.6.0 (org.apache.kafka.common.utils.AppInfoParser)
[2020-11-30 16:16:19,934] INFO Kafka commitId: 62abe01bee039651 (org.apache.kafka.common.utils.AppInfoParser)
[2020-11-30 16:16:19,934] INFO Kafka startTimeMs: 1606734979934 (org.apache.kafka.common.utils.AppInfoParser)
[2020-11-30 16:16:19,937] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
[2020-11-30 16:16:19,958] INFO Trying to introspect Token! (com.oauth2.security.oauthbearer.OAuthAuthenticateValidatorCallbackHandler)
[2020-11-30 16:16:19,958] INFO Try to introspect with oauth! (com.oauth2.security.oauthbearer.OAuthHttpCalls)
Oauth Introspect Server:<keycloak-server>
Oauth Introspect EndPoint:/auth/realms/<realm>/protocol/openid-connect/token/introspect
Oauth Authorization:Basic <encoded clientid:clientsecret>
[2020-11-30 16:16:20,268] INFO Trying to introspected (com.oauth2.security.oauthbearer.OAuthAuthenticateValidatorCallbackHandler)
[2020-11-30 16:16:20,269] INFO Validated! token.. (com.oauth2.security.oauthbearer.OAuthAuthenticateValidatorCallbackHandler)
[2020-11-30 16:16:20,351] ERROR [KafkaApi-0] Error when handling request: clientId=0, correlationId=0, api=UPDATE_METADATA, version=6, body={controller_id=0,controller_epoch=18,broker_epoch=1858,topic_states=[],live_brokers=[{id=0,endpoints=[{port=9093,host=localhost,listener=SASL_PLAINTEXT,security_protocol=2,_tagged_fields={}}],rack=null,_tagged_fields={}}],_tagged_fields={}} (kafka.server.KafkaApis)
org.apache.kafka.common.errors.ClusterAuthorizationException: Request Request(processor=0, connectionId=127.0.0.1:9093-127.0.0.1:38296-0, session=Session(User:a8111a08-3ed1-46cf-ad7a-06898c4eb9fa,/127.0.0.1), listenerName=ListenerName(SASL_PLAINTEXT), securityProtocol=SASL_PLAINTEXT, buffer=null) is not authorized.
[2020-11-30 16:16:20,363] ERROR [KafkaApi-0] Error when handling request: clientId=0, correlationId=1, api=LEADER_AND_ISR, version=4, body={controller_id=0,controller_epoch=18,broker_epoch=1858,topic_states=[{topic_name=test,partition_states=[{partition_index=0,controller_epoch=15,leader=0,leader_epoch=0,isr=[0],zk_version=0,replicas=[0],adding_replicas=[],removing_replicas=[],is_new=false,_tagged_fields={}}],_tagged_fields={}},{topic_name=oauth2-demo-topic2,partition_states=[{partition_index=0,controller_epoch=9,leader=0,leader_epoch=0,isr=[0],zk_version=0,replicas=[0],adding_replicas=[],removing_replicas=[],is_new=false,_tagged_fields={}}],_tagged_fields={}},{topic_name=oauth2-demo-topic3,partition_states=[{partition_index=0,controller_epoch=9,leader=0,leader_epoch=0,isr=[0],zk_version=0,replicas=[0],adding_replicas=[],removing_replicas=[],is_new=false,_tagged_fields={}}],_tagged_fields={}},{topic_name=oauth2-demo-topic4,partition_states=[{partition_index=0,controller_epoch=11,leader=0,leader_epoch=0,isr=[0],zk_version=0,replicas=[0],adding_replicas=[],removing_replicas=[],is_new=false,_tagged_fields={}}],_tagged_fields={}},{topic_name=oauth2-demo-topic,partition_states=[{partition_index=0,controller_epoch=2,leader=0,leader_epoch=0,isr=[0],zk_version=0,replicas=[0],adding_replicas=[],removing_replicas=[],is_new=false,_tagged_fields={}}],_tagged_fields={}}],live_leaders=[{broker_id=0,host_name=localhost,port=9093,_tagged_fields={}}],_tagged_fields={}} (kafka.server.KafkaApis)
org.apache.kafka.common.errors.ClusterAuthorizationException: Request Request(processor=0, connectionId=127.0.0.1:9093-127.0.0.1:38296-0, session=Session(User:a8111a08-3ed1-46cf-ad7a-06898c4eb9fa,/127.0.0.1), listenerName=ListenerName(SASL_PLAINTEXT), securityProtocol=SASL_PLAINTEXT, buffer=null) is not authorized.
[2020-11-30 16:16:20,368] ERROR [KafkaApi-0] Error when handling request: clientId=0, correlationId=2, api=UPDATE_METADATA, version=6, body={controller_id=0,controller_epoch=18,broker_epoch=1858,topic_states=[{topic_name=test,partition_states=[{partition_index=0,controller_epoch=15,leader=0,leader_epoch=0,isr=[0],zk_version=0,replicas=[0],offline_replicas=[],_tagged_fields={}}],_tagged_fields={}},{topic_name=oauth2-demo-topic2,partition_states=[{partition_index=0,controller_epoch=9,leader=0,leader_epoch=0,isr=[0],zk_version=0,replicas=[0],offline_replicas=[],_tagged_fields={}}],_tagged_fields={}},{topic_name=oauth2-demo-topic3,partition_states=[{partition_index=0,controller_epoch=9,leader=0,leader_epoch=0,isr=[0],zk_version=0,replicas=[0],offline_replicas=[],_tagged_fields={}}],_tagged_fields={}},{topic_name=oauth2-demo-topic4,partition_states=[{partition_index=0,controller_epoch=11,leader=0,leader_epoch=0,isr=[0],zk_version=0,replicas=[0],offline_replicas=[],_tagged_fields={}}],_tagged_fields={}},{topic_name=oauth2-demo-topic,partition_states=[{partition_index=0,controller_epoch=2,leader=0,leader_epoch=0,isr=[0],zk_version=0,replicas=[0],offline_replicas=[],_tagged_fields={}}],_tagged_fields={}}],live_brokers=[{id=0,endpoints=[{port=9093,host=localhost,listener=SASL_PLAINTEXT,security_protocol=2,_tagged_fields={}}],rack=null,_tagged_fields={}}],_tagged_fields={}} (kafka.server.KafkaApis)
org.apache.kafka.common.errors.ClusterAuthorizationException: Request Request(processor=0, connectionId=127.0.0.1:9093-127.0.0.1:38296-0, session=Session(User:a8111a08-3ed1-46cf-ad7a-06898c4eb9fa,/127.0.0.1), listenerName=ListenerName(SASL_PLAINTEXT), securityProtocol=SASL_PLAINTEXT, buffer=null) is not authorized.
[2020-11-30 16:16:33,523] INFO Trying to introspect Token! (com.oauth2.security.oauthbearer.OAuthAuthenticateValidatorCallbackHandler)
[2020-11-30 16:16:33,523] INFO Try to introspect with oauth! (com.oauth2.security.oauthbearer.OAuthHttpCalls)
Oauth Introspect Server:<keycloak-server>
Oauth Introspect EndPoint:/auth/realms/<realm>/protocol/openid-connect/token/introspect
Oauth Authorization:Basic <encoded clientid:clientsecret>
[2020-11-30 16:16:34,704] INFO Trying to introspected (com.oauth2.security.oauthbearer.OAuthAuthenticateValidatorCallbackHandler)
[2020-11-30 16:16:34,705] INFO Validated! token.. (com.oauth2.security.oauthbearer.OAuthAuthenticateValidatorCallbackHandler)
[2020-11-30 16:26:19,282] INFO [GroupMetadataManager brokerId=0] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)

生产者错误消息:

[2020-11-30 16:16:52,948] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 174 : {oauth2-demo-topic51=TOPIC_AUTHORIZATION_FAILED} (org.apache.kafka.clients.NetworkClient)
[2020-11-30 16:16:53,053] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 175 : {oauth2-demo-topic51=TOPIC_AUTHORIZATION_FAILED} (org.apache.kafka.clients.NetworkClient)

以下是我尝试的ACL:

Current ACLs for resource `ResourcePattern(resourceType=GROUP, name=test-consumer-group, patternType=PREFIXED)`: 
    (principal=User:kafkaproducerapp, host=*, operation=ALL, permissionType=ALLOW)
    (principal=User:kafkabroker, host=*, operation=ALL, permissionType=ALLOW) 

Current ACLs for resource `ResourcePattern(resourceType=TOPIC, name=oauth2-demo-topic2, patternType=LITERAL)`: 
    (principal=User:kafkabroker, host=*, operation=CREATE, permissionType=ALLOW)
    (principal=User:kafkaproducerapp, host=*, operation=DESCRIBE, permissionType=ALLOW)
    (principal=User:kafkabroker, host=*, operation=DESCRIBE, permissionType=ALLOW)
    (principal=User:*, host=*, operation=CREATE, permissionType=ALLOW)
    (principal=User:kafkaproducerapp, host=*, operation=CREATE, permissionType=ALLOW)
    (principal=User:*, host=*, operation=DESCRIBE, permissionType=ALLOW) 

Current ACLs for resource `ResourcePattern(resourceType=TOPIC, name=oauth2-demo-topic3, patternType=LITERAL)`: 
    (principal=User:kafkaproducerapp, host=*, operation=WRITE, permissionType=ALLOW)
    (principal=User:kafkabroker, host=*, operation=CREATE, permissionType=ALLOW)
    (principal=User:kafkaproducerapp, host=*, operation=DESCRIBE, permissionType=ALLOW)
    (principal=User:kafkabroker, host=*, operation=DESCRIBE, permissionType=ALLOW)
    (principal=User:kafkaproducerapp, host=*, operation=CREATE, permissionType=ALLOW) 

Current ACLs for resource `ResourcePattern(resourceType=TOPIC, name=oauth2-demo-topic, patternType=LITERAL)`: 
    (principal=User:kafkaproducerapp, host=*, operation=WRITE, permissionType=ALLOW)
    (principal=User:kafkabroker, host=*, operation=CREATE, permissionType=ALLOW)
    (principal=User:kafkabroker, host=*, operation=DESCRIBE, permissionType=ALLOW)
    (principal=User:*, host=*, operation=ALL, permissionType=ALLOW)
    (principal=User:kafkaproducerapp, host=*, operation=CREATE, permissionType=ALLOW)
    (principal=User:*, host=0.0.0.0, operation=WRITE, permissionType=ALLOW)
    (principal=User:*, host=*, operation=WRITE, permissionType=ALLOW)
    (principal=User:, host=*, operation=DESCRIBE, permissionType=ALLOW)
    (principal=User:, host=*, operation=CREATE, permissionType=ALLOW)
    (principal=User:kafkaproducerapp, host=*, operation=DESCRIBE, permissionType=ALLOW) 

Current ACLs for resource `ResourcePattern(resourceType=GROUP, name=test-consumer-group, patternType=LITERAL)`: 
    (principal=User:kafkaproducerapp, host=*, operation=ALL, permissionType=ALLOW) 

Current ACLs for resource `ResourcePattern(resourceType=TOPIC, name=oauth2-demo-topic4, patternType=LITERAL)`: 
    (principal=User:kafkaproducerapp, host=*, operation=WRITE, permissionType=ALLOW)
    (principal=User:kafkabroker, host=*, operation=CREATE, permissionType=ALLOW)
    (principal=User:kafkaproducerapp, host=*, operation=DESCRIBE, permissionType=ALLOW)
    (principal=User:console-producer, host=*, operation=CREATE, permissionType=ALLOW)
    (principal=User:kafkabroker, host=*, operation=DESCRIBE, permissionType=ALLOW)
    (principal=User:*, host=*, operation=CREATE, permissionType=ALLOW)
    (principal=User:kafkaproducerapp, host=*, operation=CREATE, permissionType=ALLOW)
    (principal=User:kafkaproducerapp, host=*, operation=ALL, permissionType=ALLOW)
    (principal=User:*, host=*, operation=WRITE, permissionType=ALLOW)
    (principal=User:console-producer, host=*, operation=DESCRIBE, permissionType=ALLOW)
    (principal=User:*, host=*, operation=DESCRIBE, permissionType=ALLOW)
    (principal=User:console-producer, host=*, operation=WRITE, permissionType=ALLOW)
    (principal=User:kafkabroker, host=*, operation=WRITE, permissionType=ALLOW) 

Current ACLs for resource `ResourcePattern(resourceType=TOPIC, name=oauth2-demo-topic5, patternType=LITERAL)`: 
    (principal=User:kafkaproducerapp, host=*, operation=WRITE, permissionType=ALLOW)
    (principal=User:kafkaproducerapp, host=*, operation=DESCRIBE, permissionType=ALLOW)
    (principal=User:kafkaproducerapp, host=*, operation=CREATE, permissionType=ALLOW)
    (principal=User:kafkaproducerapp, host=*, operation=ALL, permissionType=ALLOW) 

Current ACLs for resource `ResourcePattern(resourceType=TOPIC, name=test, patternType=LITERAL)`: 
    (principal=User:*, host=localhost, operation=WRITE, permissionType=ALLOW)
    (principal=User:*, host=127.0.0.1, operation=WRITE, permissionType=ALLOW)
    (principal=User:*, host=127.0.0.1, operation=DESCRIBE, permissionType=ALLOW)
    (principal=User:*, host=127.0.0.1, operation=CREATE, permissionType=ALLOW)

请让我知道我怎样才能解决这个问题。谢谢。

6vl6ewon

6vl6ewon1#

仔细查看后,我发现问题出在ApacheKafka二进制文件中,与keyClope有关。我们必须将apachekafka二进制构建(jar)复制到/kafka/libs目录中。在源代码中,我观察到一个字段“sub”,在代码中用作客户机id。但是在keydepot令牌中,client\ id的字段是“clientid”,而不是“sub”。所以我在apachekafka二进制源代码的源代码中修改了“sub”=>“clientid”,再次构建了它,并将其复制到libs目录中。它就像一个魔咒。

相关问题