下面是我的代码片段和流入使用者的json数据。但是我不知道如何使用这些数据进行计算。
public void sparkKafkaConsumer(JavaStreamingContext jssc,String zkBroker,Map<String, Integer> topicmap, SQLContext sqlContext) throws InterruptedException {
System.out.println("INSIDE SPARK KAFKACONSUMER METHOD..........");
System.out.println("Creating direct kafka stream with brokers and topics..........");
// Create direct kafka stream with brokers and topics
JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(jssc, zkBroker,"CustomerKafkaConsumerThread", topicmap,StorageLevel.MEMORY_ONLY());
System.out.println("Computation starts from here");
messages.print();
下面是接收到的json数据。
> (null,{"time":"2017/07/12
> 11:26:47","model":"20optky","speed":"20optky","cellId":"0605d822optky","course":"146.37optky","header":"ST600ALToptky","deviceId":"206675884optky","distance":"166optky","longitude":"-099.168493optky","latitude":"19.428616optky","payload":"ST600ALT+number+;206675884;20;376;20161005;16:26:59;0605d822;334;20;2ee5;63;+19.428616;-099.168493;000.213;146.37;6;1;166;12.21;000000;34;000887;4.4;1;0.00optky","date":"2017/07/12
> 11:26:47optky"})
请注意,我使用的是spark版本1.5.2,因此不支持Dataframe。
暂无答案!
目前还没有任何答案,快来回答吧!