我开发了一个flink程序,可以从twitter上读取推文,并将其推送到Kafka上。然后它从Kafka那里取回推特并处理它们。
“tweets processing”转换从tweet的文本中提取hashtags和用户,并在默认输出中发出它们,在side输出中发出每一对。
附加的图像是从FlinkWebUI中选取的。我不明白为什么kafka源和tweets处理操作符被合并到一个任务中,我主要希望tweets sink接收来自kafka源的所有原始tweets,而不是tweets处理操作符的输出。
程序正确吗?
数据流
这是代码的相关部分:
FlinkKafkaConsumer010<String> myConsumer = new FlinkKafkaConsumer010<String>(Constants.KAFKA_TWEETS_TOPIC, new SimpleStringSchema(), properties);
myConsumer.setStartFromLatest();
DataStream<String> tweetsStream = env
.addSource(myConsumer)
.name("Kafka tweets consumer");
SingleOutputStreamOperator<List<String>> tweetsAggregator = tweetsStream
.timeWindowAll(Time.seconds(7))
.aggregate(new StringAggregatorFunction())
.name("Tweets aggregation");
DataStreamSink tweetsSink = tweetsAggregator.addSink(new TweetsSink())
.name("Tweets sink")
.setParallelism(1);
SingleOutputStreamOperator<String> termsStream = tweetsStream
// extracting terms from tweets
.process(new TweetParse())
.name("Tweets processing");
DataStream<Tuple2<String, Integer>> counts = termsStream
.map(new ToTuple())
// Counting terms
.keyBy(0)
.timeWindow(Time.seconds(13))
.sum(1)
.name("Terms processing");
DataStream<Tuple3<String, String, Integer>> edgesStream = termsStream.getSideOutput(TweetParse.outputTag)
.map(new ToTuple3())
// Counting terms pairs
.keyBy(0, 1)
.timeWindow(Time.seconds(19))
.sum(2)
.name("Edges processing");
1条答案
按热度按时间nukf8bse1#
您正在使用创建两个不同的数据流
tweetsStream
. 首先是tweetsAggregator
第二个是termsStream
. 然后您将再次从termsstream创建两个不同的数据流:counts
以及edgesStream
. sink操作符没有输出。因此,它不能向另一个操作符生成数据,它必须是最后一个要使用的操作符。必须从数据源操作符开始addSource(myConsumer)
,链尽可能多timeWindowAll
,aggregate
,map
,keyBy
,然后呼叫接收器接线员。如果需要,可以调用多个接收器,但请记住,接收器不会向其他操作员生成数据流,它们是使用者。