postgresql—如何处理spark consumer传入的java流json数据

aurhwmvo  于 2021-06-08  发布在  Kafka
关注(0)|答案(0)|浏览(256)

下面是我的代码片段和流入使用者的json数据。但是我不知道如何使用这些数据进行计算。

public void sparkKafkaConsumer(JavaStreamingContext jssc,String zkBroker,Map<String, Integer> topicmap, SQLContext sqlContext) throws InterruptedException  {
    System.out.println("INSIDE SPARK KAFKACONSUMER METHOD..........");

    System.out.println("Creating direct kafka stream with brokers and topics..........");
    // Create direct kafka stream with brokers and topics
    JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(jssc, zkBroker,"CustomerKafkaConsumerThread", topicmap,StorageLevel.MEMORY_ONLY());

    System.out.println("Computation starts from here");
    messages.print();

下面是接收到的json数据。

> (null,{"time":"2017/07/12
> 11:26:47","model":"20optky","speed":"20optky","cellId":"0605d822optky","course":"146.37optky","header":"ST600ALToptky","deviceId":"206675884optky","distance":"166optky","longitude":"-099.168493optky","latitude":"19.428616optky","payload":"ST600ALT+number+;206675884;20;376;20161005;16:26:59;0605d822;334;20;2ee5;63;+19.428616;-099.168493;000.213;146.37;6;1;166;12.21;000000;34;000887;4.4;1;0.00optky","date":"2017/07/12
> 11:26:47optky"})

请注意,我使用的是spark版本1.5.2,因此不支持Dataframe。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题