目标:阅读Kafka与Spark流和存储数据在cassandra Java Spark cassandra连接器1.6数据输入:简单json行对象{“id”:“1”,“field 1”:“value 1}
我有一个java类,通过spark streaming读取Kafka的作品,处理读取的数据,然后将其存储在cassandra中。
下面是主要代码:
**JavaPairReceiverInputDStream**<String, String> messages =
KafkaUtils.createStream(ssc,
targetKafkaServerPort, targetTopic, topicMap);
**JavaDStream**list = messages.map(new Function<Tuple2<String,String>,List<Object>>(){
public List<Object> call( Tuple2<String,String> tuple2){
List<Object>**list**=new ArrayList<Object>();
Gson gson = new Gson();
MyClass myclass = gson.fromJson(tuple2._2(), MyClass.class);
myclass.setNewData("new_data");
String jsonInString = gson.toJson(myclass);
list.add(jsonInString);
return list;
}
});
下一个代码不正确:
**javaFunctions**(list)
.writerBuilder("schema", "table", mapToRow(JavaDStream.class))
.saveToCassandra();
因为“javaFunctions”方法需要JavaRDD对象,而“list”是JavaDStream...
我需要将JavaDStream转换为JavaRDD,但我找不到正确的方法...
有什么帮助吗?
2条答案
按热度按时间brgchamk1#
让我们使用导入静态的com.datastax.spark.connector.japi.CassandraStreamingJavaUtil.,而不是com.datastax.spark.connector.japi.CassandraJavaUtil.。
quhf5bfb2#
我所做的是在创建dsStream后使用foreachRDD:
希望对你有用...