我正在写akka,scala中的kafka生产者,我试图从scala kafka客户端向kafka代理发送消息,问题是代理没有收到这些消息,我通过从命令行启动kafka消费者验证了它。Kafka生产者和消费者在命令提示符下工作正常。kafka支持kerberos和sasl\u明文安全。
请在下面找到我的conf文件、客户机代码和应用程序日志。我认为从代码连接到kerberos时一定有问题。
scala客户端:
package com.ABC.adds.producer
import akka.actor.ActorSystem
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Source
import com.ABC.adds.models.Models.GMMOfaq
import com.ABC.adds.producer.serializer.ModelSerializer
import com.thoughtworks.xstream.XStream
import com.thoughtworks.xstream.io.xml.DomDriver
import com.typesafe.config.ConfigFactory
import com.typesafe.scalalogging.LazyLogging
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata}
import org.apache.kafka.common.serialization.ByteArraySerializer
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{Failure, Success}
object faqProducer extends App with LazyLogging{
val config = ConfigFactory.load()
implicit val system = ActorSystem.create("adds-faq-producer", config)
implicit val mat = ActorMaterializer()
val producerSettings = ProducerSettings(system, new ByteArraySerializer, new ModelSerializer[PPOfaq](classOf[PPOfaq]))
.withBootstrapServers("jbt12324.systems.pfk.ABC:3382")
.withProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT")
.withProperty("zookeeper.connect","jbt234234.systems.pfk.ABC:2341,jbt234.systems.pfk.ABC:234,jbt1234324.systems.pfk.ABC:2234")
.withProperty(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG, "1")
val xstream = new XStream(new DomDriver)
val personString: String = scala.io.Source.fromInputStream(getClass().getClassLoader().getResourceAsStream("PPOfaq.xml")).mkString
xstream.alias("faq", classOf[PPOfaq])
val ppofaq: PPOfaq = xstream.fromXML(personString).asInstanceOf[PPOfaq]
logger.info(s"Producer Configuration is : {} ", producerSettings.toString)
logger.info(s"Sending message : {}", ppofaq)
logger.info("KafkaProducer Send first fetching Partitions for topics")
val kafkaProducer = producerSettings.createKafkaProducer()
kafkaProducer.partitionsFor("asp.adds.ppo.pems")
val done1 = kafkaProducer.send(new ProducerRecord[Array[Byte], PPOfaq]("asp.adds.ppo.pems", ppofaq))
val recordMetaData : RecordMetadata = done1.get()
logger.info("Topic is : " + recordMetaData.topic() +" partition is : "+ recordMetaData.partition() +" offset is : "+ recordMetaData.offset() )
logger.info("KafkaProdcuer Send first fetching Partitions for topics end")
val done = Source.single(ppofaq)
.map { elem =>
new ProducerRecord[Array[Byte], PPOfaq]("asp.adds.ppo.pems", ppofaq)
}
.runWith(Producer.plainSink(producerSettings))
done onComplete {
case Success(s) => {
logger.info(s"The producer has sent a message to the topic: asp.adds.ppo.pems!!")
}
case Failure(e) => {
logger.error("Erorr occured while producing Topic", e)
e.printStackTrace()
e.fillInStackTrace()
e.getCause
e.getMessage
}
}
}
这是我用于kerberos身份验证的kafka\u客户端配置文件:
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
doNotPrompt=true
useTicketCache=false
useKeyTab=true
serviceName="kafka"
principal="pqr@ASND.ADROOT.ABC"
keyTab="/home/pqr/.pqr.headless.keytab"
debug=true
client=true;
};
Client {
com.sun.security.auth.module.Krb5LoginModule required
doNotPrompt=true
useKeyTab=true
useTicketCache=false
serviceName="zookeeper"
principal="pqr@ASND.ADROOT.ABC"
keyTab="/home/pqr/.pqr.headless.keytab"
debug=true;
};
这是我在集群上运行jar时得到的应用程序日志:应用程序日志:
[asd-sasds-addsk@jbt32423 ResourceBundle]$ java -Djava.security.auth.login.config=kafka_client_jaas.conf -Djava.security.krb5.conf=/etc/krb5.conf -Djavax.security.auth.useSubjectCredsOnly=true -Djava.security.debug=logincontext,gssloginconfig,configfile,configparser, -jar adds-producer.jar
[Policy Parser]: creating policy entry for expanded java.ext.dirs path:
file:/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.51-0.b16.el6_6.x86_64/jre/lib/ext/*
[Policy Parser]: creating policy entry for expanded java.ext.dirs path:
file:/usr/java/packages/lib/ext/*
14:44:56.520 [main] INFO c.h.adds.producer.addsProducer$ - Producer Configuration is : akka.kafka.ProducerSettings@35432107
14:44:56.523 [main] INFO c.h.adds.producer.addsProducer$ - Sending message : PPOadds(PIC_EVENT,01/06/2016,26/10/2016,ASSIGNED,asd_asdasd_ERRORED,asd,asdMSR,High,CASE-3,CASE-4,CASE-1,CASE-2,,CustomAttributes(GCAL,PTS Only,,DTF_INT_WORKFLOWS_COMPLETE,16065763,2495921,12,CreditDefaultSwap,CreditDefaultSwap,VERIFIED_asdasd,ABCUSA,asdCDS))
14:44:56.524 [main] INFO c.h.adds.producer.addsProducer$ - KafkaProducer Send first fetching Partitions for topics
configfile: reading file:/home/asdasd-asdasdds-adds-asda/adasd-erer/hfgh/kafka_client_jaas.conf
configparser: Reading next config entry: KafkaClient
configparser: com.sun.security.auth.module.Krb5LoginModule, REQUIRED
configparser: principal=234-dfgd-adds-asd@asdad.ADROOT.ABC
configparser: debug=true
configparser: doNotPrompt=true
configparser: keyTab=/home/asdasd-asdad-adds-rrewr/.sdfsf-sdfsd-adds-sdfsf.headless.keytab
configparser: client=true
configparser: useKeyTab=true
configparser: useTicketCache=false
configparser: serviceName=kafka
configparser: Reading next config entry: Client
configparser: com.sun.security.auth.module.Krb5LoginModule, REQUIRED
configparser: principal=sdfsf-dsf-adds-uk@sfdsdf.ADROOT.ABC
configparser: debug=true
configparser: doNotPrompt=true
configparser: keyTab=/home/sdfdsf-sfds-adds-sdf/.sdff.sdsfs-adds-usdfs.headless.keytab
configparser: useKeyTab=true
configparser: useTicketCache=false
configparser: serviceName=zookeeper
Debug is true storeKey false useTicketCache false useKeyTab true doNotPrompt true ticketCache is null isInitiator true KeyTab is /home/dasda-sasd-adds-asdad/.asdad-asd-adds-adsasd.headless.keytab refreshKrb5Config is false principal is dhsjdf-ssdf-adds-usdff@sdfs.ADROOT.ABC tryFirstPass is false useFirstPass is false storePass is false clearPass is false
principal is usdfsf-ss-adds-ufsdf@sdfs.ADROOT.ABC
Will use keytab
[LoginContext]: login success
Commit Succeeded
[LoginContext]: commit success
14:44:56.748 [main] WARN o.a.k.c.producer.ProducerConfig - The configuration 'zookeeper.connect' was supplied but isn't a known config.
adds in thread "main" org.apache.kafka.common.errors.Timeoutadds: Failed to update metadata after 60000 ms.
如果我做错了什么,请告诉我。谢谢,马亨德拉·托纳佩
1条答案
按热度按时间jhdbpxl91#
我们无法在集群中使用来自使用者端的消息,但我们能够在本地计算机上使用消息,这是因为我们使用kafka 0.10编写了应用程序api,而我们的集群使用的是kafka版本0.9。如果您检查这两个kafka版本之间的差异,您将发现存在显著差异在这两个版本的api之间。
另外,请启用kerberos调试日志以检查是否使用启用kerberos的集群对用户进行了身份验证