我正在尝试创建一个 KafkaProducer
使用 akka-stream-kafka
图书馆。
基础设施
使用docker compose,仅显示kafka和zookeeper示例。
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:5.1.0
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-enterprise-kafka:5.1.0
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "9092:9092"
- "29092:29092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:9092
CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
CONFLUENT_METRICS_ENABLE: 'true'
CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
我可以报告我已经能够使用 kafka-console-consumer
, kafka-console-producer
在cli和restapi上没有问题。
配置
这是我的typesafe配置,我尝试使用纯文本连接连接到客户端。我正在尝试连接到Kafka经纪人没有任何身份验证。
bootstrap.servers="localhost:29092"
acks = "all"
retries = 2
batch.size = 16384
linger.ms = 1
buffer.memory = 33554432
max.block.ms = 5000
代码
val config = ConfigFactory.load().getConfig("akka.kafka.producer")
val stringSerializer = new StringSerializer()
ProducerSettings[String, String](config, stringSerializer, stringSerializer)
// some code has been omitted here.
Producer.plainSink(producerSettings)
例外
这是我收到的stacktrace,它告诉我没有 jaas config
:
org.apache.kafka.common.KafkaException: Failed to construct kafka producer
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:456)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:318)
at akka.kafka.ProducerSettings.createKafkaProducer(ProducerSettings.scala:226)
at akka.kafka.scaladsl.Producer$.$anonfun$flexiFlow$1(Producer.scala:155)
at akka.kafka.internal.ProducerStage$DefaultProducerStage.createLogic(ProducerStage.scala:41)
at akka.kafka.internal.ProducerStage$DefaultProducerStage.createLogic(ProducerStage.scala:33)
at akka.stream.stage.GraphStage.createLogicAndMaterializedValue(GraphStage.scala:93)
at akka.stream.impl.GraphStageIsland.materializeAtomic(PhasedFusingActorMaterializer.scala:630)
at akka.stream.impl.PhasedFusingActorMaterializer.materialize(PhasedFusingActorMaterializer.scala:450)
at akka.stream.impl.PhasedFusingActorMaterializer.materialize(PhasedFusingActorMaterializer.scala:415)
Caused by: java.lang.IllegalArgumentException: Could not find a 'KafkaClient' entry in the JAAS configuration. System property 'java.security.auth.login.config' is not set
at org.apache.kafka.common.security.JaasContext.defaultContext(JaasContext.java:133)
at org.apache.kafka.common.security.JaasContext.load(JaasContext.java:98)
at org.apache.kafka.common.security.JaasContext.loadClientContext(JaasContext.java:84)
at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:119)
at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:65)
at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:88)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:413)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:318)
at akka.kafka.ProducerSettings.createKafkaProducer(ProducerSettings.scala:226)
at akka.kafka.scaladsl.Producer$.$anonfun$flexiFlow$1(Producer.scala:155)
如何使用本地运行所需的无身份验证连接到kafka集群?
我试过添加 KAFKA_OPTS
作为docker compose中kafka服务的环境变量,并将其添加到 application.conf
.
sasl.jaas.config="org.apache.kafka.common.security.plain.PlainLoginModule required username='confluent' password='confluent-secret';"
在前一种情况下,一些相关服务(如kafkarestapi)失败。在后一种情况下,我得到以下例外:
org.apache.kafka.common.KafkaException: Failed to construct kafka producer
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:456)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:318)
at akka.kafka.ProducerSettings.createKafkaProducer(ProducerSettings.scala:226)
at akka.kafka.scaladsl.Producer$.$anonfun$flexiFlow$1(Producer.scala:155)
at akka.kafka.internal.ProducerStage$DefaultProducerStage.createLogic(ProducerStage.scala:41)
at akka.kafka.internal.ProducerStage$DefaultProducerStage.createLogic(ProducerStage.scala:33)
at akka.stream.stage.GraphStage.createLogicAndMaterializedValue(GraphStage.scala:93)
at akka.stream.impl.GraphStageIsland.materializeAtomic(PhasedFusingActorMaterializer.scala:630)
at akka.stream.impl.PhasedFusingActorMaterializer.materialize(PhasedFusingActorMaterializer.scala:450)
at akka.stream.impl.PhasedFusingActorMaterializer.materialize(PhasedFusingActorMaterializer.scala:415)
Caused by: org.apache.kafka.common.KafkaException: java.lang.IllegalArgumentException: No serviceName defined in either JAAS or Kafka config
at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:125)
at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:140)
at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:65)
at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:88)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:413)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:318)
at akka.kafka.ProducerSettings.createKafkaProducer(ProducerSettings.scala:226)
at akka.kafka.scaladsl.Producer$.$anonfun$flexiFlow$1(Producer.scala:155)
at akka.kafka.internal.ProducerStage$DefaultProducerStage.createLogic(ProducerStage.scala:41)
at akka.kafka.internal.ProducerStage$DefaultProducerStage.createLogic(ProducerStage.scala:33)
Caused by: java.lang.IllegalArgumentException: No serviceName defined in either JAAS or Kafka config
at org.apache.kafka.common.security.kerberos.KerberosLogin.getServiceName(KerberosLogin.java:297)
at org.apache.kafka.common.security.kerberos.KerberosLogin.configure(KerberosLogin.java:87)
at org.apache.kafka.common.security.authenticator.LoginManager.<init>(LoginManager.java:52)
at org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:89)
at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:114)
at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:140)
at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:65)
at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:88)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:413)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:318)
暂无答案!
目前还没有任何答案,快来回答吧!