斯普林卡利斯特纳|读同样的信息

lpwwtiir  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(296)

我有一个主题名“user”,其中userid将被删除,我想阅读这个主题并处理下面的功能

1. process user leave data
2. process user salary data

我想让两个侦听器指向同一个主题,读取同一个用户id并并行启动处理。

@KafkaListener(topics = "${kafka.topic.user}",group="abc"))
            public void receive(String message) {

                    userService.processLeave(message);
                }

@KafkaListener(topics = "${kafka.topic.user}",group="abc1"))
            public void receive1(String message) {

                    userService.processPayRoll(message);
                }

但我一直看到,processpayroll总是被调用。
我错过了什么?

xsuvu9jc

xsuvu9jc1#

嗯,看起来你用的是老版的SpringKafka。
不幸的是 group 与消费者的 group.id . 就是这样 containerGroup 用于生命周期管理。
您应该考虑配置不同的 KafkaMessageListenerContainer 基于不同的用户配置。在那里你已经可以配置不同的 ConsumerConfig.GROUP_ID_CONFIG .
最新的版本有 @KafkaListener 结构如下:

/**
 * If provided, the listener container for this listener will be added to a bean
 * with this value as its name, of type {@code Collection<MessageListenerContainer>}.
 * This allows, for example, iteration over the collection to start/stop a subset
 * of containers.
 * @return the bean name for the group.
 */
String containerGroup() default "";

/**
 * Override the {@code group.id} property for the consumer factory with this value
 * for this listener only.
 * @return the group id.
 * @since 1.3
 */
String groupId() default "";

/**
 * When {@link #groupId() groupId} is not provided, use the {@link #id() id} (if
 * provided) as the {@code group.id} property for the consumer. Set to false, to use
 * the {@code group.id} from the consumer factory.
 * @return false to disable.
 * @since 1.3
 */
boolean idIsGroup() default true;

相关问题