eclipse中与kafka的连接:错误:找不到引线偏移

fcipmucu  于 2021-06-08  发布在  Kafka
关注(0)|答案(0)|浏览(180)

我使用的是spark streaming v2.0.0和kafka v08.2.1
我正在尝试使用eclipse将spark流连接到远程kafka服务器。为了创建主题,我使用了以下指南:http://kafka.apache.org/documentation.html#quickstart 我使用spark streaming中提供的示例编写了以下代码:

package sparkstreamingtest;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.regex.Pattern;
import kafka.serializer.StringDecoder;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import scala.Tuple2;
public class test1 {
public static void main(String[] args) throws InterruptedException {   
    String brokers = "10.66.125.130:9092";//args[0];//on récupère hosts:port
    String topics = "test2";//args[1];//on récupère les topics de kafka: topic1,topic2,..

    // Create context with a 2 seconds batch interval
    SparkConf sparkConf = new SparkConf().setAppName("JavaDirectKafkaWordCount").setMaster("local[*]");
    JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(2));

    HashSet<String> topicsSet = new HashSet<String>(Arrays.asList(topics.split(",")));//on sépare les topics pour les mettre dans un même set
    System.out.println(topicsSet);
    HashMap<String, String> kafkaParams = new HashMap<String, String>();
    kafkaParams.put("metadata.broker.list", brokers);//même chose pour les hosts (brokers)
    System.out.println(kafkaParams);
    // Create direct kafka stream with brokers and topics
    JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(
        jssc,
        String.class,
        String.class,
        StringDecoder.class,
        StringDecoder.class,
        kafkaParams,
        topicsSet
    );
    // Get the lines, split them into words, count the words and print
    JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
      @Override
      public String call(Tuple2<String, String> tuple2) {
        return tuple2._2();
      }
    });
    // Start the computation
    jssc.start();
    jssc.awaitTermination();
  }
}

我不明白以下错误:

16/08/19 10:53:58 INFO SimpleConsumer: Reconnect due to socket error: java.nio.channels.ClosedChannelException
Exception in thread "main" org.apache.spark.SparkException: java.nio.channels.ClosedChannelException
org.apache.spark.SparkException: Couldn't find leader offsets for Set([test2,0])
    at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:373)
    at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:373)
    at scala.util.Either.fold(Either.scala:98)
    at org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:372)
    at org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222)
    at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
    at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:607)
    at org.apache.spark.streaming.kafka.KafkaUtils.createDirectStream(KafkaUtils.scala)
    at sparkstreamingtest.test1.main(test1.java:64)
16/08/19 10:53:59 INFO SparkContext: Invoking stop() from shutdown hook

即使遵循前面的主题,我也无法解决:
spark streaming+kafka:sparkexception:找不到集合的引线偏移
线程“main”org.apache.spark.sparkexception中出现异常:org.apache.spark.sparkexception:找不到set()的引线-spark steaming kafka
无法从spark流连接到kafka:org.apache.spark.sparkexception:java.net.sockettimeoutexception
如何在spark kafka direct streaming中手动提交偏移量?
有人能帮我解决这个问题吗?
谢谢你的关注和帮助。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题