我在Flink中有一个项目想要优化。我已经将默认的并行度和插槽设置为4(服务器有4个内核)。
taskmanager.numberOfTaskSlots = 4
parallelism.default = 4
这是我运行任务的配置,但是使用并行或不使用并行的处理时间是相同的。在我的测试中,从具有5个分区的Kafka队列中处理30MB需要大约3分钟。
public void run() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.fallBackRestart());
// Get the Accounts DataSource
BroadcastStream<PropertyInfo> propertyInfoBroadcastStream = getBroadcastPropertyStream(env);
// Get the DataSource
DataStream<CollectionMessage> collectionMessageDataStream = getCollectionMessageStream(env);
final var router = new KeyedProcessAccumulatorRouterImpl(config);
final Duration sessionGapDuration = config.get(SESSION_EVENT_GAP_MINUTES);
SessionKeyedProcessFunction sessionKeyedProcessFunction = new SessionKeyedProcessFunction(
router, sessionGapDuration, config);
collectionMessageDataStream
.keyBy(CollectionMessage::getSessionId)
.connect(propertyInfoBroadcastStream)
.process(sessionKeyedProcessFunction)
.uid("SessionWindow")
.name("Session Window")
.setParallelism(4);
// execute program
env.execute("Processor");
}
private DataStream<CollectionMessage> getCollectionMessageStream(
StreamExecutionEnvironment env) {
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", config.getString(KAFKA_CONSUMER_SERVERS));
properties.setProperty("group.id", config.getString(KAFKA_TOPROCESS_TOPIC));
properties.setProperty("max.partition.fetch.bytes",
config.getString(KAFKA_TOPROCESS_MAX_BYTES));
FlinkKafkaConsumer<RawCollection> myConsumer = new FlinkKafkaConsumer<>(
config.getString(KAFKA_TOPROCESS_TOPIC), new KafkaMessageDeserializer(), properties);
// Take lines from file from files
DataStream<RawCollection> inputMessageStream =
env.addSource(myConsumer).setParallelism(4);
B64PayloadDeserializer b64PayloadDeserializer =
new B64PayloadDeserializer(new BaseCollectionMessageDeserializer());
// Map lines to messages
DataStream<CollectionMessage> collectionMessageDataStream =
inputMessageStream
.map(b64PayloadDeserializer::deserialize).setParallelism(4)
.uid("CollectionMessageFilter")
.name("Filter Collection Messages").setParallelism(4);
// Assign new watermark on messages based on event time
return collectionMessageDataStream;
}
查看Flink控制面板,我看到4个插槽,4个子任务中的每一个都忙碌到接近100%。在本地执行它,然后在类SessionKeyedProcessFunction中停止。我看到4个任务并行化了。发生了什么事情没有优化性能?
1条答案
按热度按时间ntjbwcob1#
一般来说,这比仅仅增加并行度并期望显著的加速要复杂一些。需要检查的几个问题可能会导致并行度的性能提升低于预期:
1.在Kafka中,消息是如何划分的?你有5个分区,但是实际上有多少消息在这些分区中?也许一个分区非常热,导致所有的工作都由一个操作符示例完成。
sessionId
是如何分布的?可能存在不对称,导致一个操作符示例完成大部分工作。1.您如何衡量这3分钟?这3分钟中的哪一部分实际上是正在完成的工作,而不是初始化、等待和清理?也许您用于测试的样本太小,无法实际显示加速,因为大多数时间与不可并行的工作有关。