从scala客户端向支持kerberos的kafka集群发送数据

q1qsirdb  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(412)

我正在写akka,scala中的kafka生产者,我试图从scala kafka客户端向kafka代理发送消息,问题是代理没有收到这些消息,我通过从命令行启动kafka消费者验证了它。Kafka生产者和消费者在命令提示符下工作正常。kafka支持kerberos和sasl\u明文安全。
请在下面找到我的conf文件、客户机代码和应用程序日志。我认为从代码连接到kerberos时一定有问题。
scala客户端:

package com.ABC.adds.producer

import akka.actor.ActorSystem
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Source
import com.ABC.adds.models.Models.GMMOfaq
import com.ABC.adds.producer.serializer.ModelSerializer
import com.thoughtworks.xstream.XStream
import com.thoughtworks.xstream.io.xml.DomDriver
import com.typesafe.config.ConfigFactory
import com.typesafe.scalalogging.LazyLogging
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata}
import org.apache.kafka.common.serialization.ByteArraySerializer

import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{Failure, Success}

object faqProducer extends App with LazyLogging{

  val config = ConfigFactory.load()
  implicit val system = ActorSystem.create("adds-faq-producer", config)
  implicit val mat = ActorMaterializer()

  val producerSettings = ProducerSettings(system, new ByteArraySerializer, new ModelSerializer[PPOfaq](classOf[PPOfaq]))
  .withBootstrapServers("jbt12324.systems.pfk.ABC:3382")
     .withProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT")
      .withProperty("zookeeper.connect","jbt234234.systems.pfk.ABC:2341,jbt234.systems.pfk.ABC:234,jbt1234324.systems.pfk.ABC:2234")
      .withProperty(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG, "1")

  val xstream = new XStream(new DomDriver)
  val personString: String = scala.io.Source.fromInputStream(getClass().getClassLoader().getResourceAsStream("PPOfaq.xml")).mkString
  xstream.alias("faq", classOf[PPOfaq])
  val ppofaq: PPOfaq = xstream.fromXML(personString).asInstanceOf[PPOfaq]

  logger.info(s"Producer Configuration is : {} ", producerSettings.toString)
  logger.info(s"Sending message : {}", ppofaq)

  logger.info("KafkaProducer Send first fetching Partitions for topics")
  val  kafkaProducer  = producerSettings.createKafkaProducer()
  kafkaProducer.partitionsFor("asp.adds.ppo.pems")
  val done1 = kafkaProducer.send(new ProducerRecord[Array[Byte], PPOfaq]("asp.adds.ppo.pems", ppofaq))
  val recordMetaData : RecordMetadata = done1.get()

    logger.info("Topic is :  " + recordMetaData.topic() +" partition is : "+ recordMetaData.partition() +" offset is : "+ recordMetaData.offset() )

  logger.info("KafkaProdcuer Send first fetching Partitions for topics end")

  val done = Source.single(ppofaq)
    .map { elem =>
      new ProducerRecord[Array[Byte], PPOfaq]("asp.adds.ppo.pems", ppofaq)
    }
    .runWith(Producer.plainSink(producerSettings))

  done onComplete {
  case Success(s) => {
    logger.info(s"The producer has sent a message to the topic: asp.adds.ppo.pems!!")
  }
  case Failure(e) => {
    logger.error("Erorr occured while producing Topic", e)
    e.printStackTrace()
    e.fillInStackTrace()
    e.getCause
    e.getMessage
  }
}
}

这是我用于kerberos身份验证的kafka\u客户端配置文件:

KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
doNotPrompt=true
useTicketCache=false
useKeyTab=true
serviceName="kafka"
principal="pqr@ASND.ADROOT.ABC"
keyTab="/home/pqr/.pqr.headless.keytab"
debug=true
client=true;
};
Client {
  com.sun.security.auth.module.Krb5LoginModule required
  doNotPrompt=true
  useKeyTab=true
  useTicketCache=false
  serviceName="zookeeper"
  principal="pqr@ASND.ADROOT.ABC"
  keyTab="/home/pqr/.pqr.headless.keytab"
  debug=true;
};

这是我在集群上运行jar时得到的应用程序日志:应用程序日志:

[asd-sasds-addsk@jbt32423 ResourceBundle]$ java -Djava.security.auth.login.config=kafka_client_jaas.conf -Djava.security.krb5.conf=/etc/krb5.conf -Djavax.security.auth.useSubjectCredsOnly=true -Djava.security.debug=logincontext,gssloginconfig,configfile,configparser, -jar adds-producer.jar
            [Policy Parser]: creating policy entry for expanded java.ext.dirs path:
                    file:/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.51-0.b16.el6_6.x86_64/jre/lib/ext/*
            [Policy Parser]: creating policy entry for expanded java.ext.dirs path:
                    file:/usr/java/packages/lib/ext/*
    14:44:56.520 [main] INFO  c.h.adds.producer.addsProducer$ - Producer Configuration is : akka.kafka.ProducerSettings@35432107
    14:44:56.523 [main] INFO  c.h.adds.producer.addsProducer$ - Sending message : PPOadds(PIC_EVENT,01/06/2016,26/10/2016,ASSIGNED,asd_asdasd_ERRORED,asd,asdMSR,High,CASE-3,CASE-4,CASE-1,CASE-2,,CustomAttributes(GCAL,PTS Only,,DTF_INT_WORKFLOWS_COMPLETE,16065763,2495921,12,CreditDefaultSwap,CreditDefaultSwap,VERIFIED_asdasd,ABCUSA,asdCDS))
    14:44:56.524 [main] INFO  c.h.adds.producer.addsProducer$ - KafkaProducer Send first fetching Partitions for topics
    configfile: reading file:/home/asdasd-asdasdds-adds-asda/adasd-erer/hfgh/kafka_client_jaas.conf
    configparser:   Reading next config entry: KafkaClient
    configparser:           com.sun.security.auth.module.Krb5LoginModule, REQUIRED
    configparser:                   principal=234-dfgd-adds-asd@asdad.ADROOT.ABC
    configparser:                   debug=true
    configparser:                   doNotPrompt=true
    configparser:                   keyTab=/home/asdasd-asdad-adds-rrewr/.sdfsf-sdfsd-adds-sdfsf.headless.keytab
    configparser:                   client=true
    configparser:                   useKeyTab=true
    configparser:                   useTicketCache=false
    configparser:                   serviceName=kafka
    configparser:   Reading next config entry: Client
    configparser:           com.sun.security.auth.module.Krb5LoginModule, REQUIRED
    configparser:                   principal=sdfsf-dsf-adds-uk@sfdsdf.ADROOT.ABC
    configparser:                   debug=true
    configparser:                   doNotPrompt=true
    configparser:                   keyTab=/home/sdfdsf-sfds-adds-sdf/.sdff.sdsfs-adds-usdfs.headless.keytab
    configparser:                   useKeyTab=true
    configparser:                   useTicketCache=false
    configparser:                   serviceName=zookeeper
    Debug is  true storeKey false useTicketCache false useKeyTab true doNotPrompt true ticketCache is null isInitiator true KeyTab is /home/dasda-sasd-adds-asdad/.asdad-asd-adds-adsasd.headless.keytab refreshKrb5Config is false principal is dhsjdf-ssdf-adds-usdff@sdfs.ADROOT.ABC tryFirstPass is false useFirstPass is false storePass is false clearPass is false
    principal is usdfsf-ss-adds-ufsdf@sdfs.ADROOT.ABC
    Will use keytab
            [LoginContext]: login success
    Commit Succeeded

            [LoginContext]: commit success
    14:44:56.748 [main] WARN  o.a.k.c.producer.ProducerConfig - The configuration 'zookeeper.connect' was supplied but isn't a known config.
    adds in thread "main" org.apache.kafka.common.errors.Timeoutadds: Failed to update metadata after 60000 ms.

如果我做错了什么,请告诉我。谢谢,马亨德拉·托纳佩

jhdbpxl9

jhdbpxl91#

我们无法在集群中使用来自使用者端的消息,但我们能够在本地计算机上使用消息,这是因为我们使用kafka 0.10编写了应用程序api,而我们的集群使用的是kafka版本0.9。如果您检查这两个kafka版本之间的差异,您将发现存在显著差异在这两个版本的api之间。
另外,请启用kerberos调试日志以检查是否使用启用kerberos的集群对用户进行了身份验证

相关问题