apache-kafka 嵌入式Kafka Junit测试执行失败

yyyllmsg  于 2022-11-01  发布在  Apache
关注(0)|答案(1)|浏览(187)

'@RunWith(SpringRunner.class)@SpringBootTest @DirtiesContext @嵌入式Kafka(分区= 1,主题= {嵌入式Kafka集成测试.测试_主题},引导服务器属性=“spring.kafka.引导服务器”)@ActiveProfiles(“测试”)公共类嵌入式Kafka集成测试{

static final String TEST_TOPIC = "MERCHANT-SERVICE-BILLABLE-EVENTS-DPG-IN";
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
@Value("${spring.kafka.consumer.auto-offset-reset}")
private String offsetReset;
@Autowired
private MessagePublisher producer;
@Autowired
private KafkaConsumer consumer;
@Autowired
private EmbeddedKafkaBroker embeddedKafka;
@Captor
ArgumentCaptor<ConsumerRecord<String, BillableEventsRequest>> billableEventsRequestArgumentCaptor;
@Captor
ArgumentCaptor<String> topicArgumentCaptor;
static {
    System.setProperty(EmbeddedKafkaBroker.BROKER_LIST_PROPERTY, "spring.kafka.bootstrap-servers");
}`

 @Test
public void embeddedKafka_whenSendingToProducer_thenMessageReceived() throws IOException, ParseException {
    //Producer
    BillableEventsRequest event = createTestRequest();
    producer.publish(event);
    //consumer
    verify(consumer, timeout(1000).times(1)).consume(billableEventsRequestArgumentCaptor.capture());
    ConsumerRecord<String, BillableEventsRequest> payload = billableEventsRequestArgumentCaptor.getValue();
    assertNotNull(payload);
    assertTrue(TEST_TOPIC.contains(topicArgumentCaptor.getValue()));
    testEvents(payload,event);
}

}
Spring Boot :v2.3.5.RELEASE
Kafka版本:2.5.1
Apache Camel 3.9.0
Java:版本11.0.15
Zookeeper。版本=3.5.9- 83 df 9301 aa 5c 2a 5d284 a9940177808 c 01 bc 3 5cef,制造日期:2021年1月6日20:03 GMT
当我在本地开始我的嵌入式Kafka测试时,它会给我下面的信息,

错误日志:

当为spring.kafka.producer.bootstrap-servers提供IP地址9092时

[Consumer clientId=consumer-sbilling-billable-events-dpg-in-consumer-1, groupId=sbilling-billable-events-dpg-in-consumer] Connection to node -1 (/10.165.101.110:9092) could not be established. Broker may not be available.
2022-10-17 17:49:50.643  WARN 1820 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-sbilling-billable-events-dpg-in-consumer-1, groupId=sbilling-billable-events-dpg-in-consumer] Bootstrap broker 10.165.101.110:9092 (id: -1 rack: null) disconnected

2022-10-17 17:49:52.006  WARN 1820 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1] Connection to node -1 (INPNQLT896845.in.db.com/10.165.101.110:9092) could not be established. Broker may not be available.
2022-10-17 17:49:52.006  WARN 1820 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1] Bootstrap broker 10.165.101.110:9092 (id: -1 rack: null) disconnected

当为spring.kafka.producer.bootstrap-servers指定localhost:9092时

2022-10-17 17:19:26.743  WARN 27272 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-sbilling-billable-events-dpg-in-consumer-1, groupId=sbilling-billable-events-dpg-in-consumer] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
2022-10-17 17:19:26.743  WARN 27272 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-sbilling-billable-events-dpg-in-consumer-1, groupId=sbilling-billable-events-dpg-in-consumer] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected

2022-10-17 17:19:27.687  WARN 27272 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
2022-10-17 17:19:27.688  WARN 27272 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected

在日志的最后,我得到了这样的,

[ERROR] Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 73.128 s <<< FAILURE! - in com.db.spricing.ms.broker.EmbeddedKafkaIntegrationTest
[ERROR] embeddedKafka_whenSendingToProducer_thenMessageReceived  Time elapsed: 60.24 s  <<< ERROR!
org.springframework.kafka.KafkaException: Send failed; nested exception is org.apache.kafka.common.errors.TimeoutException: Topic test.in.topic not present in metadata after 60000 ms.
    at com.db.spricing.ms.broker.EmbeddedKafkaIntegrationTest.embeddedKafka_whenSendingToProducer_thenMessageReceived(EmbeddedKafkaIntegrationTest.java:89)
Caused by: org.apache.kafka.common.errors.TimeoutException: Topic test.in.topic not present in metadata after 60000 ms.
but5z9lq

but5z9lq1#

很难帮助您,因为该 Boot 版本已经有一段时间不受支持:https://spring.io/projects/spring-boot#support。
看看此选项是否对您有帮助:https://docs.spring.io/spring-boot/docs/current/reference/html/messaging.html#messaging.kafka.embedded。

spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}

我不知道你为什么要设置spring.kafka.producer.bootstrap-servers,如果你需要回复Kafka为你公开的任何嵌入式内容。

相关问题