slow消费者throuput

pkbketx9  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(324)

使用spring集成kafka扩展和以下配置:

<int-kafka:zookeeper-connect id="zookeeperConnect"
    zk-connect="#{kafkaConfig['zooKeeperUrl']}" zk-connection-timeout="10000"
    zk-session-timeout="10000" zk-sync-time="2000" />

<int-kafka:consumer-context id="consumerContext" consumer-timeout="5000" zookeeper-connect="zookeeperConnect">
    <int-kafka:consumer-configurations>
        <int-kafka:consumer-configuration
                group-id="realtime-services-consumer-grp" 
                value-decoder="purchaseDecoder" 
                key-decoder="kafkaReflectionDecoder"
                max-messages="5" >
            <int-kafka:topic id="purchase" streams="1" />
        </int-kafka:consumer-configuration>
        <int-kafka:consumer-configuration 
                group-id="realtime-services-consumer-gw"
                value-decoder="eventDecoder" 
                key-decoder="kafkaReflectionDecoder" 
                max-messages="10" >
            <int-kafka:topic id="event" streams="1" />
        </int-kafka:consumer-configuration>
    </int-kafka:consumer-configurations>
</int-kafka:consumer-context>

<int-kafka:inbound-channel-adapter
    id="kafkaInboundChannelAdapter" kafka-consumer-context-ref="consumerContext"
    auto-startup="true" channel="inputFromKafka">
    <int:poller fixed-delay="20" time-unit="MILLISECONDS" />
</int-kafka:inbound-channel-adapter>

例如,当我评论第一个 consumer-configuration 我每分钟可以有300个活动没有问题。但当两者都被激活时。我的吞吐量很低。来自这两个主题的总吞吐量小于每分钟50。
有人知道为什么我在读2题的时候成绩这么差吗?配置中我做错了什么?

eqfvzcg8

eqfvzcg81#

谢谢你指出这一点!
在与我当地的Kafka碎纸机大吵了一架之后,我已经能够重现你的问题,我为你准备了一些解决办法:-)。
首先不是 round-robin ,但一个接一个:

for (final ConsumerConfiguration<K, V> consumerConfiguration : getConsumerConfigurations().values()) {
    Map<String, Map<Integer, List<Object>>> messages = consumerConfiguration.receive();

每一个 consumerConfiguration 在那期间被封锁在后台 consumer-timeout="5000" ,如果 KafkaStream 马上。因此是完整的 poll 来自的任务 <int-kafka:inbound-channel-adapter> 如果每个主题都没有消息,那么整个等待时间就是超时的总和!
为了克服这个问题,你可以减少 consumer-timeout="5000" 或者提供几个 <int-kafka:consumer-context> 因此 <int-kafka:inbound-channel-adapter> 对于每个主题。
是的,它看起来很奇怪,这是真的很糟糕,我们还没有找到一个时间来看看这个版本之前,但无论如何,请随时提出吉拉问题来解决它。
谢谢您!

相关问题