我用spark job编写了下面的代码来使用数据,
Kafka流媒体或者在获取数据后处理数据是否丢失了什么?如何检索测试数据?
// StreamingExamples.setStreamingLogLevels();
SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaWordCount").setMaster("local[*]");
;
// Create the context with 2 seconds batch size
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(1000));
Map<String, Integer> topicMap = new HashMap<>();
topicMap.put("Ptopic", 1);
JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(jssc, "localhost:2181", "5",
topicMap);
/*messages.foreach(new Function<JavaRDD<String, String>, Void>() {
public Void call(JavaRDD<String, String> accessLogs) {
return null;
}}
);*/
JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
@Override
public String call(Tuple2<String, String> tuple2) {
/*System.out.println(tuple2._1().toString());
System.out.println(tuple2._2().toString());*/
return tuple2._2();
}
});
lines.print();
jssc.start();
jssc.awaitTermination();
结果是打印出来的。。
1条答案
按热度按时间wyyhbhjk1#
您可以使用基本的大数据功能,如map、reduce、flatmap等。
更新1:
完整的例子在这篇博文中有描述