使用spark流之后没有输出

yzckvree  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(339)
HashMap<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("metadata.broker.list", "localhost:9092");

String topics = "test4";
HashSet<String> topicsSet = new HashSet<String>(Arrays.asList(topics.split(" ")));

JavaDStream<String> stream1 = KafkaUtils.createDirectStream(jssc, String.class, String.class, StringDecoder.class,
    StringDecoder.class, kafkaParams, topicsSet)
    .transformToPair(new Function<JavaPairRDD<String, String>, JavaPairRDD<String, String>>() {
      @Override
      public JavaPairRDD<String, String> call(JavaPairRDD<String, String> rdd) {
        rdd.saveAsTextFile("output");
        return rdd;
      }
    }).map(new Function<Tuple2<String, String>, String>() {
      @Override
      public String call(Tuple2<String, String> kv) {
        return kv._2();
      }
    });
stream1.print();
jssc.start();
jssc.awaitTermination();

交叉检查主题“test4”中是否有有效数据。

我期望从kafka集群流式传输的字符串在控制台中打印。控制台中没有异常,但也没有输出。我有什么遗漏吗?

fhity93d

fhity93d1#

在流应用程序启动后,您是否尝试在主题中生成数据?
默认情况下,direct stream使用配置auto.offset.reset=largest,这意味着当没有初始偏移量时,它会自动重置为最大偏移量,因此基本上,在流应用程序启动后,您将只能读取在主题中输入的新消息。

crcmnpdw

crcmnpdw2#

正如切内森所说,可能是因为你失踪了 .start() 以及 .awaitTermination() 也可能是因为spark中的转换是懒惰的,这意味着您需要添加一个操作来获得结果。例如

stream1.print();

也可能是因为 map 正在对执行器执行,因此输出将在执行器日志中,而不是在驱动程序日志中。

相关问题