java—如何使用spark获取来自kafka的数据

m1m5dgzv  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(329)

我用spark job编写了下面的代码来使用数据,
Kafka流媒体或者在获取数据后处理数据是否丢失了什么?如何检索测试数据?

// StreamingExamples.setStreamingLogLevels();
        SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaWordCount").setMaster("local[*]");
        ;
        // Create the context with 2 seconds batch size
        JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(1000));

        Map<String, Integer> topicMap = new HashMap<>();
        topicMap.put("Ptopic", 1);

        JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(jssc, "localhost:2181", "5",
                topicMap);

        /*messages.foreach(new Function<JavaRDD<String, String>, Void>() {
            public Void call(JavaRDD<String, String> accessLogs) {
                  return null;
                }}
              );*/

        JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
            @Override
            public String call(Tuple2<String, String> tuple2) {
                /*System.out.println(tuple2._1().toString());
                System.out.println(tuple2._2().toString());*/
                return tuple2._2();
            }
        });

        lines.print();
        jssc.start();
        jssc.awaitTermination();

结果是打印出来的。。

wyyhbhjk

wyyhbhjk1#

您可以使用基本的大数据功能,如map、reduce、flatmap等。
更新1:

JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
        @Override
        public String call(Tuple2<String, String> tuple2) {
            /*System.out.println(tuple2._1().toString());
            System.out.println(tuple2._2().toString());*/
            return tuple2._2();
        }
    });

    // TODO: make some transformation here:
    lines = lines.map(x -> { // clean data
        String callType = x.getCallType().replaceAll("\"", "").replaceAll("[-|,]", ""); // here some operations
        x.setCallType(callType);
        return x;
    }).filter(pair -> { // filter data
        return !isFilteredOnFire || pair.getCallType().matches("(?i).*\\bFire\\b.*"); // here so filters
    });

    lines.print();
    jssc.start();
    jssc.awaitTermination();

完整的例子在这篇博文中有描述

相关问题