kafka/spark/streaming方法

x6h2sr28  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(413)

这个问题可能是重复的。
我想听听spark的Kafka主题,然后把内容传给ignite cache。我喜欢实现apachekafka/spark流媒体系统的性能调优中描述的相同功能。
使用 KafkaUtils.createDirectStream() 为了读Kafka在《星火》中的主题 IgniteRDD 用于将数据推入ignite缓存。但系统抛出如下错误: org.apache.spark.SparkException: This RDD lacks a SparkContext. It could happen in the following cases: (1) RDD transformations and actions are NOT invoked by the driver, but inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063. (2) When a Spark Streaming job recovers from checkpoint, this exception will be hit if a reference to an RDD not defined by the streaming job is used in DStream operations. For more information, See SPARK-13758. 代码如下:

public static void main(String[] args) throws Exception{

        SparkConf conf = new SparkConf()
     .setAppName("kafka-sandbox")
                .setMaster("local[*]");
        conf.set("spark.driver.allowMultipleContexts", "true");
        JavaSparkContext sc = new JavaSparkContext(conf);

        //Context for Kafka
        JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(2000));
        // Creates Ignite context with specific configuration and runs Ignite in the embedded mode.
        IgniteContext igniteContext = new IgniteContext(
            sc.sc(),"/home/ec2-user/apache-ignite-fabric-2.6.0-bin/config/default-config.xml", false);

        // Adjust the logger to exclude the logs of no interest.
        Logger.getRootLogger().setLevel(Level.ERROR);
        Logger.getLogger("org.apache.ignite").setLevel(Level.INFO);

        // Define data to be stored in the Ignite RDD (cache).
        List<Integer> data = new ArrayList<>(20);

        for (int i = 0; i<20; i++) {
            data.add(i);
        }

        Set<String> topics = Collections.singleton("Hello-Kafka");
        Map<String, String> kafkaParams = new HashMap<>();
        kafkaParams.put("metadata.broker.list", "10.0.102.251:9092");

        JavaPairInputDStream<String, String> directKafkaStream = KafkaUtils.createDirectStream(ssc,
                String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topics);

        directKafkaStream.foreachRDD(rdd -> {

        // Create a Java Ignite RDD of Type (Int,Int) Integer Pair.
        IgniteRDD sharedRDD = igniteContext.fromCache("hello-spark");
        // Preparing a Java RDD
        JavaRDD<String> javaRDD =  sc.parallelize(Collections.singletonList("Hello-world"));
                        System.out.println("--- New RDD with " + rdd.partitions().size() + " partitions and " + rdd.count() + " records");
                        rdd.foreach(record -> {
                            //Displaying Kafka topic
                                System.out.println("Got the record : " + record._2);
                                //Pushing valeus to Ignite
                               sharedRDD.savePairs(javaRDD.<Integer, Integer>mapToPair(new PairFunction<Integer, Integer, Integer>() {
           @Override public Tuple2<Integer, Integer> call(Integer val) throws Exception {
                return new Tuple2<Integer, Integer>(val, val);
            }
        })
);
        ssc.start();
        ssc.awaitTermination();

                }
}

我无法找出代码中缺少了什么。方法是正确的还是我应该用另一种方法。请给我同样的指导。

3okqufwl

3okqufwl1#

您的示例可以简化为以下代码:

JavaRDD<Integer> rdd = sparkCtx.parallelize(Arrays.asList(1, 2, 3));
JavaRDD<Integer> javaRDD = sparkCtx.parallelize(Arrays.asList(4, 5, 6));
JavaIgniteRDD<Integer, Integer> sharedRDD = igniteCtx.fromCache("hello-spark");

rdd.foreach(record ->
    sharedRDD.savePairs(
        javaRDD.mapToPair((PairFunction<Integer, Integer, Integer>)val ->
            new Tuple2<>(val, val))
    ));

为了简化这个例子,我把Kafka从等式中去掉了。
首先,这很奇怪,你迭代了 rdd 并将 javaRDD 进入 sharedRDD ,而忽略 rdd 记录。哪里 rdd 以及 javaRDD 是不同的东西。我不明白,你为什么这么做。
你得到异常是因为你跑了 mapToPair 内部操作 foreach . 它们都是rdd操作,不能嵌套。你要么移动 savePairs 部分不属于 foreach ,或合并 rdd 以及 javaRDD 在某种程度上,这不需要运行嵌套的rdd操作。这取决于你真正想要达到的目标。

相关问题