我正在开发一个siddhi应用程序,在那里我必须连接到合流的kafka服务器。因此,我必须传递用户名/密码 optional.configuration
正确的。我要传过去 @source
(如下所示)但是获取错误 Could not find a 'KafkaClient' entry in the JAAS configuration. System property 'java.security.auth.login.config' is not set
代码:
@source(type='kafka',
topic.list='siddhi-test-topic',
partition.no.list='0,1',
threading.option='single.thread',
group.id='siddhiGroupId',
bootstrap.servers='abc.confluent.cloud:9092',
optional.configuration="security.protocol:SASL_SSL,ssl.endpoint.identification.algorithm:https,
sasl.mechanism:PLAIN,sasl.jaas.config:org.apache.kafka.common.security.plain.PlainLoginModule
required username=XXX password=YYY",
@map(type='json', @attributes(name="name",amount="amount")))
define stream SweetProductionStream(name string, amount double);
错误:
[2020-08-07 03:39:49,461] INFO {org.apache.kafka.clients.consumer.ConsumerConfig} - ConsumerConfig values:
metric.reporters = []
metadata.max.age.ms = 300000
partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
reconnect.backoff.ms = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
max.partition.fetch.bytes = 1048576
bootstrap.servers = [abc.confluent.cloud:9092]
ssl.keystore.type = JKS
enable.auto.commit = true
sasl.mechanism = PLAIN
interceptor.classes = null
exclude.internal.topics = true
ssl.truststore.password = null
client.id =
ssl.endpoint.identification.algorithm = https
max.poll.records = 2147483647
check.crcs = true
request.timeout.ms = 40000
heartbeat.interval.ms = 3000
auto.commit.interval.ms = 5000
receive.buffer.bytes = 65536
ssl.truststore.type = JKS
ssl.truststore.location = null
ssl.keystore.password = null
fetch.min.bytes = 1
send.buffer.bytes = 131072
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
group.id = siddhiGroupId
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.trustmanager.algorithm = PKIX
ssl.key.password = null
fetch.max.wait.ms = 500
sasl.kerberos.min.time.before.relogin = 60000
connections.max.idle.ms = 540000
session.timeout.ms = 30000
metrics.num.samples = 2
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
ssl.protocol = TLS
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.keystore.location = null
ssl.cipher.suites = null
security.protocol = SASL_SSL
ssl.keymanager.algorithm = SunX509
metrics.sample.window.ms = 30000
auto.offset.reset = earliest
[2020-08-07 03:39:49,465] ERROR {io.siddhi.core.stream.input.source.Source} - Error on 'kafka-test2'. Error when initiating connection with Kafka server: abc.confluent.cloud:9092 in Siddhi App: kafka-test2 Error while connecting at Source 'kafka' at 'SweetProductionStream'. Will retry in '2 min'. io.siddhi.core.exception.ConnectionUnavailableException: Error when initiating connection with Kafka server: abc.confluent.cloud:9092 in Siddhi App: kafka-test2
at io.siddhi.extension.io.kafka.source.KafkaSource.connect(KafkaSource.java:283)
at io.siddhi.extension.io.kafka.source.KafkaSource.connect(KafkaSource.java:56)
at io.siddhi.core.stream.input.source.Source.connectWithRetry(Source.java:160)
at io.siddhi.core.stream.input.source.Source$1.run(Source.java:185)
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
at java.util.concurrent.FutureTask.run(Unknown Source)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(Unknown Source)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:702)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:587)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:569)
at io.siddhi.extension.io.kafka.source.KafkaConsumerThread.<init>(KafkaConsumerThread.java:71)
at io.siddhi.extension.io.kafka.source.ConsumerKafkaGroup.<init>(ConsumerKafkaGroup.java:57)
at io.siddhi.extension.io.kafka.source.KafkaSource.connect(KafkaSource.java:261)
... 10 more
Caused by: org.apache.kafka.common.KafkaException: java.lang.IllegalArgumentException: Could not find a 'KafkaClient' entry in the JAAS configuration. System property 'java.security.auth.login.config' is not set
at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:86)
at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:70)
at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:83)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:623)
... 15 more
Caused by: java.lang.IllegalArgumentException: Could not find a 'KafkaClient' entry in the JAAS configuration. System property 'java.security.auth.login.config' is not set
at org.apache.kafka.common.security.authenticator.AbstractLogin.login(AbstractLogin.java:65)
at org.apache.kafka.common.security.authenticator.LoginManager.<init>(LoginManager.java:46)
at org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:68)
at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:78)
1条答案
按热度按时间egmofgnx1#
如图所示,您必须在conf文件(jaas)中添加所有auth配置,并为“java.security.auth.login.config”可选参数指定conf路径。