Java Spark流式传输到cassandra

t9eec4r0  于 2022-11-05  发布在  Cassandra
关注(0)|答案(2)|浏览(214)

目标:阅读Kafka与Spark流和存储数据在cassandra Java Spark cassandra连接器1.6数据输入:简单json行对象{“id”:“1”,“field 1”:“value 1}
我有一个java类,通过spark streaming读取Kafka的作品,处理读取的数据,然后将其存储在cassandra中。
下面是主要代码:


**JavaPairReceiverInputDStream**<String, String> messages =

            KafkaUtils.createStream(ssc,
                    targetKafkaServerPort, targetTopic, topicMap);

  **JavaDStream**list = messages.map(new Function<Tuple2<String,String>,List<Object>>(){
        public List<Object> call(    Tuple2<String,String> tuple2){
            List<Object>**list**=new ArrayList<Object>();

            Gson gson = new Gson();
            MyClass myclass = gson.fromJson(tuple2._2(), MyClass.class);
            myclass.setNewData("new_data");
            String jsonInString = gson.toJson(myclass);
            list.add(jsonInString);
            return list;
        }
    });

下一个代码不正确:


**javaFunctions**(list)

            .writerBuilder("schema", "table", mapToRow(JavaDStream.class))
            .saveToCassandra();

因为“javaFunctions”方法需要JavaRDD对象,而“list”是JavaDStream...

我需要将JavaDStream转换为JavaRDD,但我找不到正确的方法...

有什么帮助吗?

brgchamk

brgchamk1#

让我们使用导入静态的com.datastax.spark.connector.japi.CassandraStreamingJavaUtil.,而不是com.datastax.spark.connector.japi.CassandraJavaUtil.

quhf5bfb

quhf5bfb2#

我所做的是在创建dsStream后使用foreachRDD:

dStream.foreachRDD(new Function<JavaRDD<MyObject>, Void>() {
        @Override
        public Void call(JavaRDD<MyObject> rdd) throws Exception {
            if (rdd != null) {
                javaFunctions(rdd)
                        .writerBuilder("schema", "table", mapToRow(MyObject.class))
                        .saveToCassandra();
                logging(" -->  Saved data to cassandra",1,null);
            }

            return null;
        }
    });

希望对你有用...

相关问题