从java应用程序打印到控制台

yrwegjxp  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(500)

我正在运行ubuntu18,使用java(intellijide)编写一个基本的kafka应用程序。我试图使用这里的基本示例,并将信息流式传输到应用程序,然后将一些内容打印到屏幕上,我正在使用intellij“run”命令运行应用程序。
当我将应用程序连接到一个输出流时,它工作正常,我设法将信息输出到终端。
我试着添加 System.out.println() 在foreach方法中,在apply方法中,它不起作用,我在它们内部添加断点并运行debug模式,它没有到达那里,我猜流在运行期间没有到达那里。
我正在使用适当的主题将信息流式传输到应用程序中,并且在apply和foreach之外的应用程序中输入的打印效果良好。
如何让应用程序在每次向其传输信息时打印一些内容?其主要思想是处理一些东西并将结果打印到监视器上,而不是打印到Kafka流上
代码如下:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;

import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.streams.kstream.Printed;

import java.util.Arrays;
import java.util.Locale;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;

public class main {
    public static void main(String[] args) throws Exception {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

        // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
        // Note: To re-run the demo, you need to use the offset reset tool:
        // https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Application+Reset+Tool
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        StreamsBuilder builder = new StreamsBuilder();

        KStream<String, String> source = builder.stream("streams-plaintext-input");

        source.foreach(new ForeachAction<String, String>() {
            @Override
            public void apply(String key, String value) {
                System.out.println("yeah");
            }
        });

        KTable<String, Long> counts = source
                .flatMapValues(new ValueMapper<String, Iterable<String>>() {
                    @Override
                    public Iterable<String> apply(String value) {
                        return Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" "));
                    }
                })
                .groupBy(new KeyValueMapper<String, String, String>() {
                    @Override
                    public String apply(String key, String value) {
                        System.out.println("what");
                        return value;
                    }
                })
                .count();
        //System.exit(0);
        // need to override value serde to Long type
        System.out.println("what");
        //counts.toStream().to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));

        final KafkaStreams streams = new KafkaStreams(builder.build(), props);
        final CountDownLatch latch = new CountDownLatch(1);

        // attach shutdown handler to catch control-c
        Runtime.getRuntime().addShutdownHook(new Thread("streams-wordcount-shutdown-hook") {
            @Override
            public void run() {
                streams.close();
                latch.countDown();
            }
        });

        try {
            streams.start();
            latch.await();
        } catch (Throwable e) {
            System.exit(2);
        }
        System.exit(0);
    }

}
yyyllmsg

yyyllmsg1#

kafkastreams提供输出控制台打印dsl

KStream<String, String> source = builder.stream("streams-plaintext-input");

source.print(Printed.toSysOut());
KTable<String, Long> counts = source
    .flatMapValues(new ValueMapper<String, Iterable<String>>() {
...

试验结果

./kafka控制台生产者--代理列表localhost:9092 --topic 流明文输入

input1
input2

intellij控制台结果

[KSTREAM-SOURCE-0000000000]: null, input1
[KSTREAM-SOURCE-0000000000]: null, input2

相关问题