我在代码中做了更多更少的设置:
// loop over the inTopicName(s) {
KStream<String, String> stringInput = kBuilder.stream( STRING_SERDE, STRING_SERDE, inTopicName );
stringInput.filter( streamFilter::passOrFilterMessages ).map( processor_i ).to( outTopicName );
// } end of loop
streams = new KafkaStreams( kBuilder, streamsConfig );
streams.cleanUp();
streams.start();
如果num.stream.threads>1,那么如何将任务分配给准备好的和分配好的(循环中的)线程?
我假设(我不确定)有线程池,并且使用某种循环策略将任务分配给线程,但是可以在运行时完全动态地完成,也可以在开始时通过创建筛选/Map到结构来完成。
特别是当一个主题是计算密集型任务而另一个主题不是时,我很感兴趣。有没有可能应用程序会因为所有线程都将分配给处理器而耗尽时间。
让我们来看看场景: num.stream.threads=2
, no. partitions=4
每个主题, no. topics=2
(大主题和小主题)我的问题中的循环在应用程序启动时完成一次。如果在循环中我定义了两个主题,并且我知道从一个主题来的消息是重的(大的主题),从另一个主题来的是轻的消息(小的主题)。num.stream.threads中的两个线程是否都可能只忙于处理来自大型主题的任务?来自slimèu主题的消息必须等待处理吗?
2条答案
按热度按时间xa9qqrwz1#
如果num.stream.threads>1,那么如何将任务分配给准备好的和分配好的(循环中的)线程?
使用分区分组器将任务分配给线程。你可以在这里读到。afaik是在重新平衡后调用的,所以这不是一个非常动态的过程。也就是说,我认为没有饥饿的选择。
yqlxgs2m2#
在内部,kafka流基于分区创建任务。继续循环示例,假设有3个输入主题a、b、c,分别有2、4和3个分区。为此,您将获得4个任务(即,所有主题上的最大分区数),并具有以下分区到任务分配:
t0:a-0、b-0、c-0
t1:a-1、b-1、c-1
t2段: b-2,c-2
t3: b-3级
分区按“编号”分组并分配给相应的任务。这是在运行时(即调用
KafakStreams#start()
)因为在此之前,每个主题的分区数是未知的。如果您不了解kafka流的所有内部细节,则不建议将分区分组,因为您很容易破坏这些内容!
关于线程:任务限制线程的数量。对于我们的示例,这意味着您最多可以有4个线程(如果您有更多线程,那么这些线程将处于空闲状态,因为没有剩余的任务用于线程分配)。如何“分发”这些线程取决于您。您可以有4个单线程应用程序示例,也可以是一个具有4个线程的单线程应用程序示例(或介于两者之间的任何应用程序示例)。
如果任务数少于线程数,则将根据任务数(假定所有任务都具有相同的负载)以负载平衡的方式分配任务。