'@RunWith(SpringRunner.class)@SpringBootTest @DirtiesContext @嵌入式Kafka(分区= 1,主题= {嵌入式Kafka集成测试.测试_主题},引导服务器属性=“spring.kafka.引导服务器”)@ActiveProfiles(“测试”)公共类嵌入式Kafka集成测试{
static final String TEST_TOPIC = "MERCHANT-SERVICE-BILLABLE-EVENTS-DPG-IN";
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
@Value("${spring.kafka.consumer.auto-offset-reset}")
private String offsetReset;
@Autowired
private MessagePublisher producer;
@Autowired
private KafkaConsumer consumer;
@Autowired
private EmbeddedKafkaBroker embeddedKafka;
@Captor
ArgumentCaptor<ConsumerRecord<String, BillableEventsRequest>> billableEventsRequestArgumentCaptor;
@Captor
ArgumentCaptor<String> topicArgumentCaptor;
static {
System.setProperty(EmbeddedKafkaBroker.BROKER_LIST_PROPERTY, "spring.kafka.bootstrap-servers");
}`
@Test
public void embeddedKafka_whenSendingToProducer_thenMessageReceived() throws IOException, ParseException {
//Producer
BillableEventsRequest event = createTestRequest();
producer.publish(event);
//consumer
verify(consumer, timeout(1000).times(1)).consume(billableEventsRequestArgumentCaptor.capture());
ConsumerRecord<String, BillableEventsRequest> payload = billableEventsRequestArgumentCaptor.getValue();
assertNotNull(payload);
assertTrue(TEST_TOPIC.contains(topicArgumentCaptor.getValue()));
testEvents(payload,event);
}
}
Spring Boot :v2.3.5.RELEASE
Kafka版本:2.5.1
Apache Camel 3.9.0
Java:版本11.0.15
Zookeeper。版本=3.5.9- 83 df 9301 aa 5c 2a 5d284 a9940177808 c 01 bc 3 5cef,制造日期:2021年1月6日20:03 GMT
当我在本地开始我的嵌入式Kafka测试时,它会给我下面的信息,
错误日志:
当为spring.kafka.producer.bootstrap-servers提供IP地址9092时
[Consumer clientId=consumer-sbilling-billable-events-dpg-in-consumer-1, groupId=sbilling-billable-events-dpg-in-consumer] Connection to node -1 (/10.165.101.110:9092) could not be established. Broker may not be available.
2022-10-17 17:49:50.643 WARN 1820 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-sbilling-billable-events-dpg-in-consumer-1, groupId=sbilling-billable-events-dpg-in-consumer] Bootstrap broker 10.165.101.110:9092 (id: -1 rack: null) disconnected
2022-10-17 17:49:52.006 WARN 1820 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1] Connection to node -1 (INPNQLT896845.in.db.com/10.165.101.110:9092) could not be established. Broker may not be available.
2022-10-17 17:49:52.006 WARN 1820 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1] Bootstrap broker 10.165.101.110:9092 (id: -1 rack: null) disconnected
当为spring.kafka.producer.bootstrap-servers指定localhost:9092时
2022-10-17 17:19:26.743 WARN 27272 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-sbilling-billable-events-dpg-in-consumer-1, groupId=sbilling-billable-events-dpg-in-consumer] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
2022-10-17 17:19:26.743 WARN 27272 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-sbilling-billable-events-dpg-in-consumer-1, groupId=sbilling-billable-events-dpg-in-consumer] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected
2022-10-17 17:19:27.687 WARN 27272 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
2022-10-17 17:19:27.688 WARN 27272 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected
在日志的最后,我得到了这样的,
[ERROR] Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 73.128 s <<< FAILURE! - in com.db.spricing.ms.broker.EmbeddedKafkaIntegrationTest
[ERROR] embeddedKafka_whenSendingToProducer_thenMessageReceived Time elapsed: 60.24 s <<< ERROR!
org.springframework.kafka.KafkaException: Send failed; nested exception is org.apache.kafka.common.errors.TimeoutException: Topic test.in.topic not present in metadata after 60000 ms.
at com.db.spricing.ms.broker.EmbeddedKafkaIntegrationTest.embeddedKafka_whenSendingToProducer_thenMessageReceived(EmbeddedKafkaIntegrationTest.java:89)
Caused by: org.apache.kafka.common.errors.TimeoutException: Topic test.in.topic not present in metadata after 60000 ms.
1条答案
按热度按时间but5z9lq1#
很难帮助您,因为该 Boot 版本已经有一段时间不受支持:https://spring.io/projects/spring-boot#support。
看看此选项是否对您有帮助:https://docs.spring.io/spring-boot/docs/current/reference/html/messaging.html#messaging.kafka.embedded。
我不知道你为什么要设置
spring.kafka.producer.bootstrap-servers
,如果你需要回复Kafka为你公开的任何嵌入式内容。