目前,我正在使用plainloginmodule对用户进行身份验证。但是,我现在用下面列出的代码创建了一个jar,并希望使用它来代替plainloginmodule:https://cwiki.apache.org/confluence/display/kafka/kip-86%3a+configurable+sasl+callback+handlers#kip-86:可配置的saslcallbackhandlers-sample\u plainsamplecallbackhandler用于ASL/plain。
我已经将jar文件放入~/libs文件夹并添加了 listener.name.sasl_ssl.plain.sasl.server.callback.handler.class=com.synopsys.demo.DemoApplication
将my server.properties和我的kafka\u server\u jaas.conf放入:
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret"
user_admin="admin-secret";
};
当我启动服务器时,我得到一个错误:
第1部分:
14:36:41.924 [main] DEBUG org.apache.kafka.common.network.Selector - [KafkaServer id=1] Successfully authenticated with swe-analyticsdb-prod2/10.15.164.233
14:36:41.924 [Controller-1-to-broker-1-send-thread] DEBUG org.apache.kafka.common.network.Selector - [Controller id=1, targetBrokerId=1] Successfully authenticated with swe-analyticsdb-prod2/10.15.164.233
14:36:41.924 [Controller-1-to-broker-1-send-thread] INFO kafka.controller.RequestSendThread - [RequestSendThread controllerId=1] Controller 1 connected to swe-analyticsdb-prod2:9093 (id: 1 rack: null) for sending state change requests
14:36:41.925 [data-plane-kafka-network-thread-1-ListenerName(SASL_SSL)-SASL_SSL-1] DEBUG org.apache.kafka.common.network.Selector - [SocketServer brokerId=1] Connection with swe-analyticsdb-prod2.internal.synopsys.com/10.15.164.233 disconnected
java.io.EOFException: null
>---at org.apache.kafka.common.network.SslTransportLayer.read(SslTransportLayer.java:573)
>---at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:94)
>---at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424)
>---at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385)
>---at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:651)
>---at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:572)
>---at org.apache.kafka.common.network.Selector.poll(Selector.java:483)
>---at kafka.network.Processor.poll(SocketServer.scala:830)
>---at kafka.network.Processor.run(SocketServer.scala:730)
>---at java.lang.Thread.run(Thread.java:748)
14:36:41.925 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name connections-closed:
14:36:41.925 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name connections-created:
14:36:41.925 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name successful-authentication:
14:36:41.925 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name successful-reauthentication:
14:36:41.925 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name successful-authentication-no-reauth:
14:36:41.926 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name failed-authentication:
14:36:41.926 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name failed-reauthentication:
14:36:41.926 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name reauthentication-latency:
14:36:41.926 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name bytes-sent-received:
14:36:41.927 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name bytes-sent:
14:36:41.927 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name bytes-received:
14:36:41.927 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name select-time:
14:36:41.927 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name io-time:
14:36:41.928 [main] WARN kafka.utils.CoreUtils$ - org.apache.kafka.common.requests.ControlledShutdownRequest$Builder.<init>(IJS)V
java.lang.NoSuchMethodError: org.apache.kafka.common.requests.ControlledShutdownRequest$Builder.<init>(IJS)V
>---at kafka.server.KafkaServer.doControlledShutdown$1(KafkaServer.scala:520)
>---at kafka.server.KafkaServer.controlledShutdown(KafkaServer.scala:563)
>---at kafka.server.KafkaServer.$anonfun$shutdown$2(KafkaServer.scala:585)
>---at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:86)
>---at kafka.server.KafkaServer.shutdown(KafkaServer.scala:585)
>---at kafka.server.KafkaServer.startup(KafkaServer.scala:342)
>---at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:38)
>---at kafka.Kafka$.main(Kafka.scala:75)
>---at kafka.Kafka.main(Kafka.scala)
14:36:41.929 [main] INFO kafka.common.ZkNodeChangeNotificationListener$ChangeEventProcessThread - [/config/changes-event-process-thread]: Shutting down
14:36:41.929 [/config/changes-event-process-thread] INFO kafka.common.ZkNodeChangeNotificationListener$ChangeEventProcessThread - [/config/changes-event-process-thread]: Stopped
14:36:41.929 [main] INFO kafka.common.ZkNodeChangeNotificationListener$ChangeEventProcessThread - [/config/changes-event-process-thread]: Shutdown completed
14:36:41.930 [main] INFO kafka.network.SocketServer - [SocketServer brokerId=1] Stopping socket server request processors
14:36:41.931 [data-plane-kafka-socket-acceptor-ListenerName(SASL_SSL)-SASL_SSL-9093] DEBUG kafka.network.Acceptor - Closing server socket and selector.
14:36:41.933 [data-plane-kafka-network-thread-1-ListenerName(SASL_SSL)-SASL_SSL-0] DEBUG kafka.network.Processor - Closing selector - processor 0
14:36:41.934 [data-plane-kafka-network-thread-1-ListenerName(SASL_SSL)-SASL_SSL-0] DEBUG kafka.network.Processor - Closing selector connection 10.15.164.233:9093-10.15.164.233:44774-0
14:36:41.935 [Controller-1-to-broker-1-send-thread] DEBUG org.apache.kafka.common.network.Selector - [Controller id=1, targetBrokerId=1] Connection with swe-analyticsdb-prod2/10.15.164.233 disconnected
第2部分:
07:22:21.223 [main] DEBUG kafka.utils.KafkaScheduler - Shutting down task scheduler.
07:22:21.223 [main] INFO kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper - [ExpirationReaper-1-Heartbeat]: Shutting down
07:22:21.254 [Controller-1-to-broker-1-send-thread] DEBUG org.apache.kafka.clients.NetworkClient - [Controller id=1, targetBrokerId=1] Initiating connection to node swe-analyticsdb-prod2:9093 (id: 1 rack: null) using address swe-analyticsdb-prod2/10.15.164.233
07:22:21.254 [Controller-1-to-broker-1-send-thread] DEBUG org.apache.kafka.common.security.authenticator.SaslClientAuthenticator - Set SASL client state to SEND_APIVERSIONS_REQUEST
07:22:21.254 [Controller-1-to-broker-1-send-thread] DEBUG org.apache.kafka.common.security.authenticator.SaslClientAuthenticator - Creating SaslClient: client=null;service=kafka;serviceHostname=swe-analyticsdb-prod2;mechs=[PLAIN]
07:22:21.255 [Controller-1-to-broker-1-send-thread] DEBUG org.apache.kafka.common.network.Selector - [Controller id=1, targetBrokerId=1] Connection with swe-analyticsdb-prod2/10.15.164.233 disconnected
java.net.ConnectException: Connection refused
>---at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>---at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
>---at org.apache.kafka.common.network.SslTransportLayer.finishConnect(SslTransportLayer.java:119)
>---at org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:216)
>---at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:531)
>---at org.apache.kafka.common.network.Selector.poll(Selector.java:483)
>---at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:539)
>---at org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:74)
>---at kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:282)
>---at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:236)
>---at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
07:22:21.255 [Controller-1-to-broker-1-send-thread] DEBUG org.apache.kafka.clients.NetworkClient - [Controller id=1, targetBrokerId=1] Node 1 disconnected.
07:22:21.255 [Controller-1-to-broker-1-send-thread] WARN org.apache.kafka.clients.NetworkClient - [Controller id=1, targetBrokerId=1] Connection to node 1 (swe-analyticsdb-prod2/10.15.164.233:9093) could not be established. Broker may not be available.
07:22:21.256 [Controller-1-to-broker-1-send-thread] WARN kafka.controller.RequestSendThread - [RequestSendThread controllerId=1] Controller 1's connection to broker swe-analyticsdb-prod2:9093 (id: 1 rack: null) was unsuccessful
java.io.IOException: Connection to swe-analyticsdb-prod2:9093 (id: 1 rack: null) failed.
>---at org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:71)
>---at kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:282)
>---at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:236)
>---at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
07:22:21.356 [Controller-1-to-broker-1-send-thread] DEBUG org.apache.kafka.clients.NetworkClient - [Controller id=1, targetBrokerId=1] Initiating connection to node swe-analyticsdb-prod2:9093 (id: 1 rack: null) using address swe-analyticsdb-prod2/10.15.164.233
07:22:21.356 [Controller-1-to-broker-1-send-thread] DEBUG org.apache.kafka.common.security.authenticator.SaslClientAuthenticator - Set SASL client state to SEND_APIVERSIONS_REQUEST
07:22:21.356 [Controller-1-to-broker-1-send-thread] DEBUG org.apache.kafka.common.security.authenticator.SaslClientAuthenticator - Creating SaslClient: client=null;service=kafka;serviceHostname=swe-analyticsdb-prod2;mechs=[PLAIN]
07:22:21.357 [Controller-1-to-broker-1-send-thread] DEBUG org.apache.kafka.common.network.Selector - [Controller id=1, targetBrokerId=1] Connection with swe-analyticsdb-prod2/10.15.164.233 disconnected
java.net.ConnectException: Connection refused
>---at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>---at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
>---at org.apache.kafka.common.network.SslTransportLayer.finishConnect(SslTransportLayer.java:119)
>---at org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:216)
>---at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:531)
>---at org.apache.kafka.common.network.Selector.poll(Selector.java:483)
>---at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:539)
>---at org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:74)
>---at kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:282)
>---at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:236)
>---at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
07:22:21.357 [Controller-1-to-broker-1-send-thread] DEBUG org.apache.kafka.clients.NetworkClient - [Controller id=1, targetBrokerId=1] Node 1 disconnected.
07:22:21.357 [Controller-1-to-broker-1-send-thread] WARN org.apache.kafka.clients.NetworkClient - [Controller id=1, targetBrokerId=1] Connection to node 1 (swe-analyticsdb-prod2/10.15.164.233:9093) could not be established. Broker may not be available.
更新我注意到,即使我不使用我创建的jar/类,也会发生这种行为,但是只要把它放在“./libs”目录中就行了。使用内置或自定义authenticatecallbackhandler类,上述错误将始终发生。
我漏了一步吗?我知道我必须将jar添加到kafka中,这样它才能识别和使用它,但是我没有看到任何教程/文档解释如何使用带有plain的自定义回调处理程序。有人知道怎么做吗?
我用的是Kafka2.2
我的自定义类代码:
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
import org.apache.kafka.common.security.plain.PlainAuthenticateCallback;
import kafka.common.KafkaException;
import javax.naming.AuthenticationNotSupportedException;
import javax.naming.Context;
import javax.naming.NamingException;
import javax.naming.directory.DirContext;
import javax.naming.directory.InitialDirContext;
import javax.security.auth.callback.Callback;
import javax.security.auth.callback.NameCallback;
import javax.security.auth.callback.UnsupportedCallbackException;
import javax.security.auth.login.AppConfigurationEntry;
import java.io.IOException;
import java.util.Hashtable;
import java.util.List;
import java.util.Map;
public class CustomCallback implements AuthenticateCallbackHandler {
@Override
public void configure(Map<String, ?> configs, String mechanism, List<AppConfigurationEntry> jaasConfigEntries) {
}
@Override
public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
String username = null;
for (Callback callback: callbacks) {
if (callback instanceof NameCallback)
username = ((NameCallback) callback).getDefaultName();
else if (callback instanceof PlainAuthenticateCallback) {
PlainAuthenticateCallback plainCallback = (PlainAuthenticateCallback) callback;
boolean authenticated = authenticate(username, plainCallback.password());
plainCallback.authenticated(authenticated);
} else
throw new UnsupportedCallbackException(callback);
}
}
protected boolean authenticate(String username, char[] password) throws IOException {
if (username == null)
return false;
else {
// Return true if password matches expected password
Hashtable<String, String> environment = new Hashtable<String, String>();
System.out.println("Custom class is being called");
environment.put(Context.INITIAL_CONTEXT_FACTORY, "com.sun.jndi.ldap.LdapCtxFactory");
environment.put(Context.PROVIDER_URL, "ldap://adldap.internal.synopsys.com:389");
environment.put(Context.SECURITY_AUTHENTICATION, "simple");
environment.put(Context.SECURITY_PRINCIPAL, "CN=" + username+",CN=Users,DC=internal,DC=synopsys,DC=com");
environment.put(Context.SECURITY_CREDENTIALS, new String(password));
try
{
DirContext context = new InitialDirContext(environment);
context.getEnvironment();
context.close();
return true;
}
catch (AuthenticationNotSupportedException exception)
{
System.out.println("The authentication is not supported by the server");
return false;
}
catch (AuthenticationException exception)
{
System.out.println("Incorrect password or username");
return false;
}
catch (NamingException exception)
{
System.out.println("Error when trying to create the context");
return false;
}
}
}
@Override
public void close() throws KafkaException {
}
public static void main(String[] args) throws IOException {
char[] pass = new char[]{'P', '0', 'm', 'e', 'l', '0', '2', '0', '1', '9', '!'};
CustomCallback test = new CustomCallback();
System.out.println(test.authenticate("<username>",pass));
System.out.println(test.getClass().getName());
//SpringApplication.run(DemoApplication.class, args);
}
}
server.properties内容:
advertised.listeners=SASL_SSL://<machine name>:9093
ssl.endpoint.identification.algorithm=HTTPS
ssl.client.auth=required
ssl.truststore.location=/remote/sde108/kafka/kafka/SSL2/client/server.truststore.jks
ssl.truststore.password=password
ssl.keystore.location=/remote/sde108/kafka/kafka/SSL2/client/server.keystore.jks
ssl.keystore.password=password
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
zookeeper.set.acl=false
listeners=SASL_SSL://<machine name>:9093
security.inter.broker.protocol=SASL_SSL
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
offsets.retention.minutes=1
# listener.name.sasl_sasl.plain.sasl.server.callback.handler.class=<package name>.CustomCallbackApplication
pom.xml文件:
<?xml version="1.0" encoding="UTF-8"?>
http://maven.apache.org/xsd/maven-4.0.0.xsd"> 4.0.0
<groupId>synopsys</groupId>
<artifactId>synopsys</artifactId>
<version>1.0-SNAPSHOT</version>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>6</source>
<target>6</target>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
</dependency>
</dependencies>
meta-inf是使用项目结构和构建工件制作的
1条答案
按热度按时间ocebsuys1#
我想我两天前给你回信了。我通过在mysql文件中存储用户名/密码来定制sasl/plain身份验证机制。我还发现kip-86非常令人困惑,因为它提供了不同的方法来做同样的事情,而没有区分它们之间的区别。
这就是我所做的工作。
我实现的接口是authenticatecallbackhandler
生成的jar不应放在~/libs下。有一个lib子目录,您可以在其中安装kafka。
我没有修改kafka\u server\u jaas.conf