我试着用kafkaapi运行一个测试用例,下面的测试用例代码取自https://www.codenotfound.com/spring-kafka-consumer-producer-example.html //java应用程序测试
import static org.assertj.core.api.Assertions.assertThat;
import java.util.concurrent.TimeUnit;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerEndpoint;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.kafka.test.rule.KafkaEmbedded;
import org.springframework.kafka.test.utils.ContainerTestUtils;
import org.springframework.test.context.junit4.SpringRunner;
//@TestExecutionListeners( { DependencyInjectionTestExecutionListener.class })
@RunWith(SpringRunner.class)
@SpringBootTest(classes= {SpringKafkaIntegrationApplication.class },webEnvironment = WebEnvironment.RANDOM_PORT)
public class SpringKafkaApplicationTest {
private static String BOOT_TOPIC = "boot.t";
@Autowired
private Sender sender;
@Autowired
private Receiver receiver;
@Autowired
private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, BOOT_TOPIC);
@Before
public void setUp() throws Exception {
// wait until the partitions are assigned
// kafkaListenerEndpointRegistry.registerListenerContainer(endpoint, factory);
for (MessageListenerContainer messageListenerContainer : kafkaListenerEndpointRegistry
.getListenerContainers()) {
**Below line gives issue**
ContainerTestUtils.waitForAssignment(messageListenerContainer,
embeddedKafka.getPartitionsPerTopic());
}
}
@Test
public void testReceive() throws Exception {
sender.send(BOOT_TOPIC, "Hello");
System.out.println("check");
receiver.getLatch().await(10000, TimeUnit.MILLISECONDS);
// receiver.receive(BOOT_TOPIC);
assertThat(receiver.getLatch().getCount()).isEqualTo(0);
}
}
我在application.yml中定义了如下主题-
spring:
kafka:
consumer:
auto-offset-reset: earliest
group-id: boot
kafka:
topic:
boot: boot.t
inside containertestutils类inside waitforassignment方法with below call返回assignedpartitions=null
int count = 0;
....
Collection<?> assignedPartitions = (Collection<?>) getAssignedPartitions.invoke(aContainer);
if (assignedPartitions != null) {
count += assignedPartitions.size();
}
...
所以上面的数字仍然是0。但我得到的Assert失败如下:
org.junit.ComparisonFailure: expected:<[2]> but was:<[0]>
暂无答案!
目前还没有任何答案,快来回答吧!