spark streaming和kafka缺少所需的配置“partition.assignment.strategy”,该配置没有默认值

o8x7eapl  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(395)

我试图运行与Kafka使用YarnSpark流应用程序。我得到以下堆栈跟踪错误-
原因:org.apache.kafka.common.config.configexception:缺少没有默认值的必需配置“partition.assignment.strategy”。在org.apache.kafka.common.config.configdef.parse(configdef。java:124)在org.apache.kafka.common.config.abstractconfig。java:48)在org.apache.kafka.clients.consumer.consumerconfig。java:194)在org.apache.kafka.clients.consumer.kafkaconsumer。java:380)在org.apache.kafka.clients.consumer.kafkaconsumer.(kafkaconsumer。java:363)在org.apache.kafka.clients.consumer.kafkaconsumer。java:350)访问org.apache.spark.streaming.kafka010.cachedkafconsumer.(cachedkafconsumer。scala:45)访问org.apache.spark.streaming.kafka010.cachedkafconsumer$.get(cachedkafconsumer)。scala:194)在org.apache.spark.streaming.kafka010.kafkarditerator.(Kafka德)。scala:252)在org.apache.spark.streaming.kafka010.kafkardd.compute(kafkardd。scala:212)在org.apache.spark.rdd.rdd.computeorreadcheckpoint(rdd。scala:324)在org.apache.spark.rdd.rdd.iterator(rdd。scala:288)在org.apache.spark.rdd.mappartitionsrdd.compute(mappartitionsrdd。scala:49)在org.apache.spark.rdd.rdd.computeorreadcheckpoint(rdd。scala:324)在org.apache.spark.rdd.rdd.iterator(rdd。scala:288)在org.apache.spark.scheduler.resulttask.runtask(resulttask。scala:87)在org.apache.spark.scheduler.task.run(task。scala:109)在org.apache.spark.executor.executor$taskrunner.run(executor。scala:345)
下面是我如何用spark stream创建kafkastream的代码片段-

val ssc = new StreamingContext(sc, Seconds(60))

val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> "*boorstrap_url:port*",
  "security.protocol" -> "SASL_PLAINTEXT",
  "sasl.kerberos.service.name" -> "kafka",
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "group.id" -> "annotation-test",
  //Tried commenting and uncommenting this property      
  //"partition.assignment.strategy"->"org.apache.kafka.clients.consumer.RangeAssignor",
  "auto.offset.reset" -> "earliest",
  "enable.auto.commit" -> (false: java.lang.Boolean))

val topics = Array("*topic-name*")

val kafkaStream = KafkaUtils.createDirectStream[String, String](
  ssc,
  PreferConsistent,
  Subscribe[String, String](topics, kafkaParams))
val valueKafka = kafkaStream.map(record => record.value())

我看了下面的帖子-
https://issues.apache.org/jira/browse/kafka-4547
Pypark结构化流kafka配置错误
根据这一点,我已经将fat jar中的kafka util jar更新为0.10.2.0版本,默认情况下从spark stream kafka jar打包为transient dependency。另外,当我在单个节点上运行它时,通过将master设置为local,我的工作也可以正常工作。我正在运行spark 2.3.1版本。

mklgxw1f

mklgxw1f1#

添加 kafka-clients-*.jar 到你的spark jar文件夹。 kafka-clients-*.jarkafka-*/lib 目录。

相关问题