如何测试consumerawarerebalancelistener?

tyg4sfes  于 2021-06-06  发布在  Kafka
关注(0)|答案(2)|浏览(389)

我开发了一个 @KafkaListener 这也被标记为 ConsumerAwareRebalanceListener 接口,使用SpringBoot2.0.6。我实现了 onPartitionsAssigned 方法,其中我倒带固定时间的偏移量,比如说60秒。
到现在为止,一直都还不错。
如何使用SpringKafka提供的工具测试上述用例?我想我需要开始一个Kafka经纪人(即 EmbeddedKafka ),然后停止侦听器,然后再次重新启动它,以测试它是否再次读取在最后60秒内到达的消息。
有人能帮我吗?我搜索了一下,但什么也没找到。谢谢。

gojuced7

gojuced71#

public class MyRebalanceListener implements ConsumerAwareRebalanceListener {

    @Override
    public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
        long rewindTo = System.currentTimeMillis() - 60000;
        Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes = consumer.offsetsForTimes(partitions.stream()
                .collect(Collectors.toMap(tp -> tp, tp -> rewindTo)));
        offsetsForTimes.forEach((k, v) -> consumer.seek(k, v.offset()));
    }

}

@RunWith(SpringRunner.class)
@SpringBootTest
public class So52973119ApplicationTests {

    @Test
    public void rebalanceListenerTests() {
        MyRebalanceListener listener = new MyRebalanceListener();
        Consumer<?, ?> consumer = mock(Consumer.class);
        AtomicLong expected = new AtomicLong(System.currentTimeMillis() - 60_000);
        given(consumer.offsetsForTimes(anyMap())).willAnswer(i -> {
            AtomicLong offset = new AtomicLong();
            Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes = new HashMap<>();
            Map<TopicPartition, Long> argument = i.getArgument(0);
            argument.forEach((k, v) -> {
                offsetsForTimes.put(k, new OffsetAndTimestamp(offset.incrementAndGet(), 0L));
                assertThat(v).isBetween(expected.get(), expected.get() + 1_000);
            });
            return offsetsForTimes ;
        });
        TopicPartition t1 = new TopicPartition("foo", 0);
        TopicPartition t2 = new TopicPartition("foo", 1);
        List<TopicPartition> partitions = new ArrayList<>();
        partitions.add(t1);
        partitions.add(t2);
        listener.onPartitionsAssigned(consumer, partitions);
        verify(consumer).seek(t1, 1);
        verify(consumer).seek(t2, 2);
    }

}
7ajki6be

7ajki6be2#

这个 @KafkaListener 具有:

/**
 * The unique identifier of the container managing for this endpoint.
 * <p>If none is specified an auto-generated one is provided.
 * @return the {@code id} for the container managing for this endpoint.
 * @see org.springframework.kafka.config.KafkaListenerEndpointRegistry#getListenerContainer(String)
 */
String id() default "";

属性,以便可以访问其 MessageListenerContainer 提到via KafkaListenerEndpointRegistry ,你可以简单地 @Autowired 进入基于spring测试框架的测试类。那么,你真的可以 stop() 以及 start() 那个 MessageListenerContainer 在你的测试方法中。
也要注意怎么做 @KafkaListener 有一个 autoStartup() 属性也是。

相关问题