我创建了一个简单的flink应用程序,它从kafka主题读取日志,在一个持续时间为2秒、水印为1秒的窗口中聚合日志。然后我将结果发送到另一个Kafka主题。应用程序部署在集群上。
我这样消费Kafka:
Map<String, String> parameters = new HashMap<String, String>();
parameters.put("bootstrap.servers", bootstrapServers);
parameters.put("group.id", groupId);
parameters.put("zookeeper.connect", zookeeperConnect);
parameters.put("topic", inputTopic);
parameters.put("auto.offset.reset", "earliest");
ParameterTool parameterTool = ParameterTool.fromMap(parameters);
return env.addSource(new FlinkKafkaConsumer011<>(parameterTool.getRequired("topic"),
new SimpleStringSchema(),
parameterTool.getProperties()));
通过以下方式进行聚合:
DataStream<AggregatedMeasures> aggregatedStream = messageStream
// Extract the timestamp from the object
.assignTimestampsAndWatermarks(new TimestampExtractor())
//Key for the Aggregation
.keyBy(new KeySelector<MeasureData, Tuple2<String, Timestamp>>() {
@Override
public Tuple2<String, Timestamp> getKey(MeasureData value) throws Exception {
return Tuple2.of(value.Id(), value.getEventTimestamp());
}
})
//Set the Time Window Duration
.timeWindow(Time.seconds(windowDuration))
//Aggregate
.aggregate(new AggregateMeasureFunction());
我对Kafka的创作是这样的:
producer = new FlinkKafkaProducer011<>(
bootstrapServers, // broker list
outputTopic, // target topic
new AggregatedMeasuresJSONSerializer()); // serialization schema
producer.setWriteTimestampToKafka(true);
messageStream.addSink(producer); // DataStream<AggregatedMeasures>
我在输入主题一百万个日志时测试了它。这个用python编写的制作人速度很慢,flink在live中工作得很好。
当我尝试阅读同样的输入主题时,这个主题已经被一百万个日志填满了,flinkproducer读取这些日志,但是没有输出所有的结果,只输出其中的一部分。
是背压吗?我不理解那种行为。
我正在使用Java8、Flink1.4和yarn。
暂无答案!
目前还没有任何答案,快来回答吧!