java—从flink中的kafka聚合数据的问题

d8tt03nd  于 2021-06-08  发布在  Kafka
关注(0)|答案(0)|浏览(295)

我创建了一个简单的flink应用程序,它从kafka主题读取日志,在一个持续时间为2秒、水印为1秒的窗口中聚合日志。然后我将结果发送到另一个Kafka主题。应用程序部署在集群上。
我这样消费Kafka:

Map<String, String> parameters = new HashMap<String, String>();
    parameters.put("bootstrap.servers", bootstrapServers);
    parameters.put("group.id", groupId);
    parameters.put("zookeeper.connect", zookeeperConnect);
    parameters.put("topic", inputTopic);
    parameters.put("auto.offset.reset", "earliest");

ParameterTool parameterTool = ParameterTool.fromMap(parameters);

return env.addSource(new FlinkKafkaConsumer011<>(parameterTool.getRequired("topic"),
    new SimpleStringSchema(),
    parameterTool.getProperties()));

通过以下方式进行聚合:

DataStream<AggregatedMeasures> aggregatedStream = messageStream

    // Extract the timestamp from the object
    .assignTimestampsAndWatermarks(new TimestampExtractor())

    //Key for the Aggregation
    .keyBy(new KeySelector<MeasureData, Tuple2<String, Timestamp>>() {
      @Override
      public Tuple2<String, Timestamp> getKey(MeasureData value) throws Exception {
        return Tuple2.of(value.Id(), value.getEventTimestamp());
      }
    })

    //Set the Time Window Duration
    .timeWindow(Time.seconds(windowDuration))

    //Aggregate
    .aggregate(new AggregateMeasureFunction());

我对Kafka的创作是这样的:

producer = new FlinkKafkaProducer011<>(
    bootstrapServers, // broker list
    outputTopic, // target topic
    new AggregatedMeasuresJSONSerializer()); // serialization schema

producer.setWriteTimestampToKafka(true);

messageStream.addSink(producer); // DataStream<AggregatedMeasures>

我在输入主题一百万个日志时测试了它。这个用python编写的制作人速度很慢,flink在live中工作得很好。
当我尝试阅读同样的输入主题时,这个主题已经被一百万个日志填满了,flinkproducer读取这些日志,但是没有输出所有的结果,只输出其中的一部分。
是背压吗?我不理解那种行为。
我正在使用Java8、Flink1.4和yarn。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题