kafkaproducer

wd2eg0qa  于 2021-06-07  发布在  Kafka
关注(0)|答案(0)|浏览(196)

我正在尝试创建一个 KafkaProducer 使用 akka-stream-kafka 图书馆。
基础设施
使用docker compose,仅显示kafka和zookeeper示例。

version: '2'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:5.1.0
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-enterprise-kafka:5.1.0
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
      - "29092:29092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:9092
      CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
      CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
      CONFLUENT_METRICS_ENABLE: 'true'
      CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'

我可以报告我已经能够使用 kafka-console-consumer , kafka-console-producer 在cli和restapi上没有问题。
配置
这是我的typesafe配置,我尝试使用纯文本连接连接到客户端。我正在尝试连接到Kafka经纪人没有任何身份验证。

bootstrap.servers="localhost:29092"
acks = "all"
retries = 2
batch.size = 16384
linger.ms = 1
buffer.memory = 33554432
max.block.ms = 5000

代码

val config = ConfigFactory.load().getConfig("akka.kafka.producer")
val stringSerializer = new StringSerializer()
ProducerSettings[String, String](config, stringSerializer, stringSerializer)
// some code has been omitted here.
Producer.plainSink(producerSettings)

例外
这是我收到的stacktrace,它告诉我没有 jaas config :

org.apache.kafka.common.KafkaException: Failed to construct kafka producer
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:456)
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:318)
    at akka.kafka.ProducerSettings.createKafkaProducer(ProducerSettings.scala:226)
    at akka.kafka.scaladsl.Producer$.$anonfun$flexiFlow$1(Producer.scala:155)
    at akka.kafka.internal.ProducerStage$DefaultProducerStage.createLogic(ProducerStage.scala:41)
    at akka.kafka.internal.ProducerStage$DefaultProducerStage.createLogic(ProducerStage.scala:33)
    at akka.stream.stage.GraphStage.createLogicAndMaterializedValue(GraphStage.scala:93)
    at akka.stream.impl.GraphStageIsland.materializeAtomic(PhasedFusingActorMaterializer.scala:630)
    at akka.stream.impl.PhasedFusingActorMaterializer.materialize(PhasedFusingActorMaterializer.scala:450)
    at akka.stream.impl.PhasedFusingActorMaterializer.materialize(PhasedFusingActorMaterializer.scala:415)
Caused by: java.lang.IllegalArgumentException: Could not find a 'KafkaClient' entry in the JAAS configuration. System property 'java.security.auth.login.config' is not set
    at org.apache.kafka.common.security.JaasContext.defaultContext(JaasContext.java:133)
    at org.apache.kafka.common.security.JaasContext.load(JaasContext.java:98)
    at org.apache.kafka.common.security.JaasContext.loadClientContext(JaasContext.java:84)
    at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:119)
    at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:65)
    at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:88)
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:413)
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:318)
    at akka.kafka.ProducerSettings.createKafkaProducer(ProducerSettings.scala:226)
    at akka.kafka.scaladsl.Producer$.$anonfun$flexiFlow$1(Producer.scala:155)

如何使用本地运行所需的无身份验证连接到kafka集群?
我试过添加 KAFKA_OPTS 作为docker compose中kafka服务的环境变量,并将其添加到 application.conf .

sasl.jaas.config="org.apache.kafka.common.security.plain.PlainLoginModule required username='confluent' password='confluent-secret';"

在前一种情况下,一些相关服务(如kafkarestapi)失败。在后一种情况下,我得到以下例外:

org.apache.kafka.common.KafkaException: Failed to construct kafka producer
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:456)
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:318)
    at akka.kafka.ProducerSettings.createKafkaProducer(ProducerSettings.scala:226)
    at akka.kafka.scaladsl.Producer$.$anonfun$flexiFlow$1(Producer.scala:155)
    at akka.kafka.internal.ProducerStage$DefaultProducerStage.createLogic(ProducerStage.scala:41)
    at akka.kafka.internal.ProducerStage$DefaultProducerStage.createLogic(ProducerStage.scala:33)
    at akka.stream.stage.GraphStage.createLogicAndMaterializedValue(GraphStage.scala:93)
    at akka.stream.impl.GraphStageIsland.materializeAtomic(PhasedFusingActorMaterializer.scala:630)
    at akka.stream.impl.PhasedFusingActorMaterializer.materialize(PhasedFusingActorMaterializer.scala:450)
    at akka.stream.impl.PhasedFusingActorMaterializer.materialize(PhasedFusingActorMaterializer.scala:415)
Caused by: org.apache.kafka.common.KafkaException: java.lang.IllegalArgumentException: No serviceName defined in either JAAS or Kafka config
    at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:125)
    at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:140)
    at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:65)
    at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:88)
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:413)
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:318)
    at akka.kafka.ProducerSettings.createKafkaProducer(ProducerSettings.scala:226)
    at akka.kafka.scaladsl.Producer$.$anonfun$flexiFlow$1(Producer.scala:155)
    at akka.kafka.internal.ProducerStage$DefaultProducerStage.createLogic(ProducerStage.scala:41)
    at akka.kafka.internal.ProducerStage$DefaultProducerStage.createLogic(ProducerStage.scala:33)
Caused by: java.lang.IllegalArgumentException: No serviceName defined in either JAAS or Kafka config
    at org.apache.kafka.common.security.kerberos.KerberosLogin.getServiceName(KerberosLogin.java:297)
    at org.apache.kafka.common.security.kerberos.KerberosLogin.configure(KerberosLogin.java:87)
    at org.apache.kafka.common.security.authenticator.LoginManager.<init>(LoginManager.java:52)
    at org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:89)
    at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:114)
    at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:140)
    at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:65)
    at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:88)
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:413)
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:318)

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题