无法找到具有kafka spark集成的集合([topic,0])的引线

qjp7pelc  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(367)

我正在尝试使用ssl的KafkaSpark集成。我已经测试了启用ssl的kafka,它可以很好地与示例消费者和生产者一起工作。
此外,我还尝试了spark-kafka的集成,当spark作业中没有ssl时,它也可以正常工作。
现在,当我在spark作业中启用ssl时,我得到了一个异常,集成无法工作。
我为在spark作业中启用ssl所做的唯一更改是在我的作业中包含以下代码行:

sparkConf.set("security.protocol", "SSL");
    sparkConf.set("ssl.truststore.location", "PATH/truststore.jks");
    sparkConf.set("ssl.truststore.password", "passwrd");
    sparkConf.set("ssl.keystore.location", "PATH/keystore.jks");
    sparkConf.set("ssl.keystore.password", "kstore");
    sparkConf.set("ssl.key.password", "keypass");

这个sparkconf是在创建流上下文时传递的。

JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(10000));

当我运行作业时,得到的错误如下:

17/05/24 18:16:39 WARN ConsumerFetcherManager$LeaderFinderThread: [test-consumer-group_bmj-cluster-1495664195784-5f49cbd0-leader-finder-thread], Failed to find leader for Set([bell,0])
java.lang.NullPointerException
    at org.apache.kafka.common.utils.Utils.formatAddress(Utils.java:312)
    at kafka.cluster.Broker.connectionString(Broker.scala:62)
    at kafka.client.ClientUtils$$anonfun$fetchTopicMetadata$5.apply(ClientUtils.scala:89)
    at kafka.client.ClientUtils$$anonfun$fetchTopicMetadata$5.apply(ClientUtils.scala:89)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.AbstractTraversable.map(Traversable.scala:104)
    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:89)
    at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)

Kafka版本-2.11-0.10.2.0
spark版本-2.1.0
scala版本-2.11.8
流媒体库

<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming_2.10 -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.11</artifactId>
        <version>2.1.0</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka_2.10 -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
        <version>2.1.0</version>
    </dependency>

关于克服这个问题有什么帮助吗?

kg7wmglp

kg7wmglp1#

经过深入研究,我终于弄清楚了我的问题所在。
首先,为了启用与ssl相关的ssl,kafka参数需要传递到kafkautils.createdirectstream()方法,而不是javastreamingcontext的sparkconf。
然后,给定的ssl参数:

"security.protocol", "SSL"
"ssl.truststore.location", "PATH/truststore.jks"
"ssl.truststore.password", "passwrd"
"ssl.keystore.location", "PATH/keystore.jks"
"ssl.keystore.password", "kstore"
"ssl.key.password", "keypass"

我使用的spark kafka流媒体版本“0-8_2.11”不支持,因此我不得不将其更改为版本“0-10_2.11”。
作为回报,它对方法有一个完整的api更改:kafkautils.createdirectstream(),用于连接到kafka。
文档中给出了如何在这里使用它的说明。
因此,我连接到Kafka的最后一段代码如下所示:

final JavaInputDStream<ConsumerRecord<String, String>> stream =
            KafkaUtils.createDirectStream(
                    javaStreamingContext,
                    LocationStrategies.PreferConsistent(),
                    ConsumerStrategies.<String, String>Subscribe(topicsCollection, kafkaParams)
            );

kafka params是一个包含所有ssl参数的Map。
谢谢
寒酸的

相关问题