kafkamessagelistenercontainer.getassignedpartitions返回0

q3qa4bjr  于 2021-06-07  发布在  Kafka
关注(0)|答案(0)|浏览(271)

我试着用kafkaapi运行一个测试用例,下面的测试用例代码取自https://www.codenotfound.com/spring-kafka-consumer-producer-example.html //java应用程序测试

import static org.assertj.core.api.Assertions.assertThat;

import java.util.concurrent.TimeUnit;

import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerEndpoint;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.kafka.test.rule.KafkaEmbedded;
import org.springframework.kafka.test.utils.ContainerTestUtils;
import org.springframework.test.context.junit4.SpringRunner;
//@TestExecutionListeners( { DependencyInjectionTestExecutionListener.class })
@RunWith(SpringRunner.class)
@SpringBootTest(classes= {SpringKafkaIntegrationApplication.class },webEnvironment = WebEnvironment.RANDOM_PORT)
public class SpringKafkaApplicationTest {

  private static String  BOOT_TOPIC  = "boot.t";

  @Autowired
  private Sender sender;

  @Autowired
  private Receiver receiver;

  @Autowired
  private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

  @ClassRule
  public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, BOOT_TOPIC);

      @Before
  public void setUp() throws Exception {
    // wait until the partitions are assigned
    //  kafkaListenerEndpointRegistry.registerListenerContainer(endpoint, factory);
    for (MessageListenerContainer messageListenerContainer : kafkaListenerEndpointRegistry
        .getListenerContainers()) {

**Below line gives issue**

        ContainerTestUtils.waitForAssignment(messageListenerContainer,
                 embeddedKafka.getPartitionsPerTopic());

    }
  }

  @Test
  public void testReceive() throws Exception {
    sender.send(BOOT_TOPIC, "Hello");
    System.out.println("check");
    receiver.getLatch().await(10000, TimeUnit.MILLISECONDS);
   // receiver.receive(BOOT_TOPIC);
    assertThat(receiver.getLatch().getCount()).isEqualTo(0);
  }
}

我在application.yml中定义了如下主题-

spring:
  kafka:
    consumer:
      auto-offset-reset: earliest
      group-id: boot

kafka:
  topic:
    boot: boot.t

inside containertestutils类inside waitforassignment方法with below call返回assignedpartitions=null

int count = 0;
    ....
        Collection<?> assignedPartitions = (Collection<?>) getAssignedPartitions.invoke(aContainer); 
        if (assignedPartitions != null) {
                            count += assignedPartitions.size();

                    }
...

所以上面的数字仍然是0。但我得到的Assert失败如下:

org.junit.ComparisonFailure: expected:<[2]> but was:<[0]>

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题