我试图运行与Kafka使用YarnSpark流应用程序。我得到以下堆栈跟踪错误-
原因:org.apache.kafka.common.config.configexception:缺少没有默认值的必需配置“partition.assignment.strategy”。在org.apache.kafka.common.config.configdef.parse(configdef。java:124)在org.apache.kafka.common.config.abstractconfig。java:48)在org.apache.kafka.clients.consumer.consumerconfig。java:194)在org.apache.kafka.clients.consumer.kafkaconsumer。java:380)在org.apache.kafka.clients.consumer.kafkaconsumer.(kafkaconsumer。java:363)在org.apache.kafka.clients.consumer.kafkaconsumer。java:350)访问org.apache.spark.streaming.kafka010.cachedkafconsumer.(cachedkafconsumer。scala:45)访问org.apache.spark.streaming.kafka010.cachedkafconsumer$.get(cachedkafconsumer)。scala:194)在org.apache.spark.streaming.kafka010.kafkarditerator.(Kafka德)。scala:252)在org.apache.spark.streaming.kafka010.kafkardd.compute(kafkardd。scala:212)在org.apache.spark.rdd.rdd.computeorreadcheckpoint(rdd。scala:324)在org.apache.spark.rdd.rdd.iterator(rdd。scala:288)在org.apache.spark.rdd.mappartitionsrdd.compute(mappartitionsrdd。scala:49)在org.apache.spark.rdd.rdd.computeorreadcheckpoint(rdd。scala:324)在org.apache.spark.rdd.rdd.iterator(rdd。scala:288)在org.apache.spark.scheduler.resulttask.runtask(resulttask。scala:87)在org.apache.spark.scheduler.task.run(task。scala:109)在org.apache.spark.executor.executor$taskrunner.run(executor。scala:345)
下面是我如何用spark stream创建kafkastream的代码片段-
val ssc = new StreamingContext(sc, Seconds(60))
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "*boorstrap_url:port*",
"security.protocol" -> "SASL_PLAINTEXT",
"sasl.kerberos.service.name" -> "kafka",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "annotation-test",
//Tried commenting and uncommenting this property
//"partition.assignment.strategy"->"org.apache.kafka.clients.consumer.RangeAssignor",
"auto.offset.reset" -> "earliest",
"enable.auto.commit" -> (false: java.lang.Boolean))
val topics = Array("*topic-name*")
val kafkaStream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams))
val valueKafka = kafkaStream.map(record => record.value())
我看了下面的帖子-
https://issues.apache.org/jira/browse/kafka-4547
Pypark结构化流kafka配置错误
根据这一点,我已经将fat jar中的kafka util jar更新为0.10.2.0版本,默认情况下从spark stream kafka jar打包为transient dependency。另外,当我在单个节点上运行它时,通过将master设置为local,我的工作也可以正常工作。我正在运行spark 2.3.1版本。
1条答案
按热度按时间mklgxw1f1#
添加
kafka-clients-*.jar
到你的spark jar文件夹。kafka-clients-*.jar
在kafka-*/lib
目录。