我使用的是spark streaming v2.0.0和kafka v08.2.1
我正在尝试使用eclipse将spark流连接到远程kafka服务器。为了创建主题,我使用了以下指南:http://kafka.apache.org/documentation.html#quickstart 我使用spark streaming中提供的示例编写了以下代码:
package sparkstreamingtest;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.regex.Pattern;
import kafka.serializer.StringDecoder;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import scala.Tuple2;
public class test1 {
public static void main(String[] args) throws InterruptedException {
String brokers = "10.66.125.130:9092";//args[0];//on récupère hosts:port
String topics = "test2";//args[1];//on récupère les topics de kafka: topic1,topic2,..
// Create context with a 2 seconds batch interval
SparkConf sparkConf = new SparkConf().setAppName("JavaDirectKafkaWordCount").setMaster("local[*]");
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(2));
HashSet<String> topicsSet = new HashSet<String>(Arrays.asList(topics.split(",")));//on sépare les topics pour les mettre dans un même set
System.out.println(topicsSet);
HashMap<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put("metadata.broker.list", brokers);//même chose pour les hosts (brokers)
System.out.println(kafkaParams);
// Create direct kafka stream with brokers and topics
JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(
jssc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topicsSet
);
// Get the lines, split them into words, count the words and print
JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
@Override
public String call(Tuple2<String, String> tuple2) {
return tuple2._2();
}
});
// Start the computation
jssc.start();
jssc.awaitTermination();
}
}
我不明白以下错误:
16/08/19 10:53:58 INFO SimpleConsumer: Reconnect due to socket error: java.nio.channels.ClosedChannelException
Exception in thread "main" org.apache.spark.SparkException: java.nio.channels.ClosedChannelException
org.apache.spark.SparkException: Couldn't find leader offsets for Set([test2,0])
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:373)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:373)
at scala.util.Either.fold(Either.scala:98)
at org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:372)
at org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222)
at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:607)
at org.apache.spark.streaming.kafka.KafkaUtils.createDirectStream(KafkaUtils.scala)
at sparkstreamingtest.test1.main(test1.java:64)
16/08/19 10:53:59 INFO SparkContext: Invoking stop() from shutdown hook
即使遵循前面的主题,我也无法解决:
spark streaming+kafka:sparkexception:找不到集合的引线偏移
线程“main”org.apache.spark.sparkexception中出现异常:org.apache.spark.sparkexception:找不到set()的引线-spark steaming kafka
无法从spark流连接到kafka:org.apache.spark.sparkexception:java.net.sockettimeoutexception
如何在spark kafka direct streaming中手动提交偏移量?
有人能帮我解决这个问题吗?
谢谢你的关注和帮助。
乔
暂无答案!
目前还没有任何答案,快来回答吧!