这个问题可能是重复的。
我想听听spark的Kafka主题,然后把内容传给ignite cache。我喜欢实现apachekafka/spark流媒体系统的性能调优中描述的相同功能。
使用 KafkaUtils.createDirectStream()
为了读Kafka在《星火》中的主题 IgniteRDD
用于将数据推入ignite缓存。但系统抛出如下错误: org.apache.spark.SparkException: This RDD lacks a SparkContext. It could happen in the following cases: (1) RDD transformations and actions are NOT invoked by the driver, but inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063. (2) When a Spark Streaming job recovers from checkpoint, this exception will be hit if a reference to an RDD not defined by the streaming job is used in DStream operations. For more information, See SPARK-13758.
代码如下:
public static void main(String[] args) throws Exception{
SparkConf conf = new SparkConf()
.setAppName("kafka-sandbox")
.setMaster("local[*]");
conf.set("spark.driver.allowMultipleContexts", "true");
JavaSparkContext sc = new JavaSparkContext(conf);
//Context for Kafka
JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(2000));
// Creates Ignite context with specific configuration and runs Ignite in the embedded mode.
IgniteContext igniteContext = new IgniteContext(
sc.sc(),"/home/ec2-user/apache-ignite-fabric-2.6.0-bin/config/default-config.xml", false);
// Adjust the logger to exclude the logs of no interest.
Logger.getRootLogger().setLevel(Level.ERROR);
Logger.getLogger("org.apache.ignite").setLevel(Level.INFO);
// Define data to be stored in the Ignite RDD (cache).
List<Integer> data = new ArrayList<>(20);
for (int i = 0; i<20; i++) {
data.add(i);
}
Set<String> topics = Collections.singleton("Hello-Kafka");
Map<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("metadata.broker.list", "10.0.102.251:9092");
JavaPairInputDStream<String, String> directKafkaStream = KafkaUtils.createDirectStream(ssc,
String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topics);
directKafkaStream.foreachRDD(rdd -> {
// Create a Java Ignite RDD of Type (Int,Int) Integer Pair.
IgniteRDD sharedRDD = igniteContext.fromCache("hello-spark");
// Preparing a Java RDD
JavaRDD<String> javaRDD = sc.parallelize(Collections.singletonList("Hello-world"));
System.out.println("--- New RDD with " + rdd.partitions().size() + " partitions and " + rdd.count() + " records");
rdd.foreach(record -> {
//Displaying Kafka topic
System.out.println("Got the record : " + record._2);
//Pushing valeus to Ignite
sharedRDD.savePairs(javaRDD.<Integer, Integer>mapToPair(new PairFunction<Integer, Integer, Integer>() {
@Override public Tuple2<Integer, Integer> call(Integer val) throws Exception {
return new Tuple2<Integer, Integer>(val, val);
}
})
);
ssc.start();
ssc.awaitTermination();
}
}
我无法找出代码中缺少了什么。方法是正确的还是我应该用另一种方法。请给我同样的指导。
1条答案
按热度按时间3okqufwl1#
您的示例可以简化为以下代码:
为了简化这个例子,我把Kafka从等式中去掉了。
首先,这很奇怪,你迭代了
rdd
并将javaRDD
进入sharedRDD
,而忽略rdd
记录。哪里rdd
以及javaRDD
是不同的东西。我不明白,你为什么这么做。你得到异常是因为你跑了
mapToPair
内部操作foreach
. 它们都是rdd操作,不能嵌套。你要么移动savePairs
部分不属于foreach
,或合并rdd
以及javaRDD
在某种程度上,这不需要运行嵌套的rdd操作。这取决于你真正想要达到的目标。