java Flink和Kafka并行优化

x4shl7ld  于 2023-01-15  发布在  Java
关注(0)|答案(1)|浏览(123)

我在Flink中有一个项目想要优化。我已经将默认的并行度和插槽设置为4(服务器有4个内核)。

taskmanager.numberOfTaskSlots = 4
parallelism.default = 4

这是我运行任务的配置,但是使用并行或不使用并行的处理时间是相同的。在我的测试中,从具有5个分区的Kafka队列中处理30MB需要大约3分钟。

public void run() throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setRestartStrategy(RestartStrategies.fallBackRestart());
    // Get the Accounts DataSource
    BroadcastStream<PropertyInfo> propertyInfoBroadcastStream = getBroadcastPropertyStream(env);

    // Get the DataSource
    DataStream<CollectionMessage> collectionMessageDataStream = getCollectionMessageStream(env);
    final var router = new KeyedProcessAccumulatorRouterImpl(config);
    final Duration sessionGapDuration = config.get(SESSION_EVENT_GAP_MINUTES);
    SessionKeyedProcessFunction sessionKeyedProcessFunction = new SessionKeyedProcessFunction(
            router, sessionGapDuration, config);

        collectionMessageDataStream
          .keyBy(CollectionMessage::getSessionId)
          .connect(propertyInfoBroadcastStream)
          .process(sessionKeyedProcessFunction)
          .uid("SessionWindow")
          .name("Session Window")
          .setParallelism(4);

    // execute program
    env.execute("Processor");
}

private DataStream<CollectionMessage> getCollectionMessageStream(
        StreamExecutionEnvironment env) {
    Properties properties = new Properties();
    properties.setProperty("bootstrap.servers", config.getString(KAFKA_CONSUMER_SERVERS));
    properties.setProperty("group.id", config.getString(KAFKA_TOPROCESS_TOPIC));
    properties.setProperty("max.partition.fetch.bytes",
        config.getString(KAFKA_TOPROCESS_MAX_BYTES));
    FlinkKafkaConsumer<RawCollection> myConsumer = new FlinkKafkaConsumer<>(
        config.getString(KAFKA_TOPROCESS_TOPIC), new KafkaMessageDeserializer(), properties);

    // Take lines from file from files
    DataStream<RawCollection> inputMessageStream =
        env.addSource(myConsumer).setParallelism(4);

    B64PayloadDeserializer b64PayloadDeserializer =
        new B64PayloadDeserializer(new BaseCollectionMessageDeserializer());

    // Map lines to messages
    DataStream<CollectionMessage> collectionMessageDataStream =
        inputMessageStream
            .map(b64PayloadDeserializer::deserialize).setParallelism(4)
            .uid("CollectionMessageFilter")
            .name("Filter Collection Messages").setParallelism(4);

    // Assign new watermark on messages based on event time
    return collectionMessageDataStream;
}

查看Flink控制面板,我看到4个插槽,4个子任务中的每一个都忙碌到接近100%。在本地执行它,然后在类SessionKeyedProcessFunction中停止。我看到4个任务并行化了。发生了什么事情没有优化性能?

ntjbwcob

ntjbwcob1#

一般来说,这比仅仅增加并行度并期望显著的加速要复杂一些。需要检查的几个问题可能会导致并行度的性能提升低于预期:
1.在Kafka中,消息是如何划分的?你有5个分区,但是实际上有多少消息在这些分区中?也许一个分区非常热,导致所有的工作都由一个操作符示例完成。

  1. sessionId是如何分布的?可能存在不对称,导致一个操作符示例完成大部分工作。
    1.您如何衡量这3分钟?这3分钟中的哪一部分实际上是正在完成的工作,而不是初始化、等待和清理?也许您用于测试的样本太小,无法实际显示加速,因为大多数时间与不可并行的工作有关。

相关问题