flink:了解我程序的数据流

lzfw57am  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(613)

我开发了一个flink程序,可以从twitter上读取推文,并将其推送到Kafka上。然后它从Kafka那里取回推特并处理它们。
“tweets processing”转换从tweet的文本中提取hashtags和用户,并在默认输出中发出它们,在side输出中发出每一对。
附加的图像是从FlinkWebUI中选取的。我不明白为什么kafka源和tweets处理操作符被合并到一个任务中,我主要希望tweets sink接收来自kafka源的所有原始tweets,而不是tweets处理操作符的输出。
程序正确吗?
数据流
这是代码的相关部分:

FlinkKafkaConsumer010<String> myConsumer = new FlinkKafkaConsumer010<String>(Constants.KAFKA_TWEETS_TOPIC, new SimpleStringSchema(), properties);
    myConsumer.setStartFromLatest();

    DataStream<String> tweetsStream = env
            .addSource(myConsumer)
            .name("Kafka tweets consumer");

    SingleOutputStreamOperator<List<String>> tweetsAggregator = tweetsStream
            .timeWindowAll(Time.seconds(7))
            .aggregate(new StringAggregatorFunction())
            .name("Tweets aggregation");

    DataStreamSink tweetsSink = tweetsAggregator.addSink(new TweetsSink())
            .name("Tweets sink")
            .setParallelism(1);

    SingleOutputStreamOperator<String> termsStream = tweetsStream
            // extracting terms from tweets
            .process(new TweetParse())
            .name("Tweets processing");

    DataStream<Tuple2<String, Integer>> counts = termsStream
            .map(new ToTuple())
            // Counting terms
            .keyBy(0)
            .timeWindow(Time.seconds(13))
            .sum(1)
            .name("Terms processing");

    DataStream<Tuple3<String, String, Integer>> edgesStream = termsStream.getSideOutput(TweetParse.outputTag)
            .map(new ToTuple3())
            // Counting terms pairs
            .keyBy(0, 1)
            .timeWindow(Time.seconds(19))
            .sum(2)
            .name("Edges processing");
nukf8bse

nukf8bse1#

您正在使用创建两个不同的数据流 tweetsStream . 首先是 tweetsAggregator 第二个是 termsStream . 然后您将再次从termsstream创建两个不同的数据流: counts 以及 edgesStream . sink操作符没有输出。因此,它不能向另一个操作符生成数据,它必须是最后一个要使用的操作符。必须从数据源操作符开始 addSource(myConsumer) ,链尽可能多 timeWindowAll , aggregate , map , keyBy ,然后呼叫接收器接线员。如果需要,可以调用多个接收器,但请记住,接收器不会向其他操作员生成数据流,它们是使用者。

相关问题