我有以下制作人:
@Outgoing("platform-outcode-stats")
public Multi<OutcodeStats> produceOutCodeStats() {
return outcodeStatsResource
.getUkOutCodeStats()
.onFailure().retry().atMost(1)
.onOverflow().buffer(100);
}
将有效负载流发布到“平台输出代码统计”主题。如文档中所述,此方法只调用一次以检索发布者。到现在为止,一直都还不错。
我想创建一个单元测试来查看主题是否正确创建和填充。因此,我使用testcontainer以这种方式创建单个kafka服务器:
public class KafkaServerResource implements QuarkusTestResourceLifecycleManager {
KafkaContainer kafkaServer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.2.1"));
@Override
public Map<String, String> start() {
kafkaServer.start();
return Collections.singletonMap("kafka.bootstrap.servers", kafkaServer.getBootstrapServers());
}
@Override
public void stop() {
kafkaServer.stop();
}
我创建了一个愚蠢的测试来看看会发生什么:
@Inject
@Channel("platform-outcode-stats")
@Acknowledgment(Acknowledgment.Strategy.PRE_PROCESSING)
Publisher<OutcodeStats> outcodeStats;
@Test
void topicShouldHaveStats(){
Flowable.fromPublisher(outcodeStats).toList().blockingGet().forEach(el-> System.out.println(el.getOutcode()));
}
这个测试实际上什么都没有测试,但是我想看看控制台中是否打印了一些东西。好吧,因为没有消息确认,我让Kafka永远困在下面的日志里:
2020-10-24 18:00:21,173 INFO [org.apa.kaf.cli.con.KafkaConsumer] (vert.x-kafka-consumer-thread-0) [Consumer clientId=consumer-d34f1529-406f-4f84-b7e1-235df4ca94da-1, groupId=d34f1529-406f-4f84-b7e1-235df4ca94da] Subscribed to topic(s): platform-outcode-stats
2020-10-24 18:00:21,220 WARN [org.apa.kaf.cli.NetworkClient] (vert.x-kafka-consumer-thread-0) [Consumer clientId=consumer-d34f1529-406f-4f84-b7e1-235df4ca94da-1, groupId=d34f1529-406f-4f84-b7e1-235df4ca94da] Error while fetching metadata with correlation id 3 : {platform-outcode-stats=LEADER_NOT_AVAILABLE}
2020-10-24 18:00:21,220 INFO [org.apa.kaf.cli.Metadata] (vert.x-kafka-consumer-thread-0) [Consumer clientId=consumer-d34f1529-406f-4f84-b7e1-235df4ca94da-1, groupId=d34f1529-406f-4f84-b7e1-235df4ca94da] Cluster ID: Yuo_aAylRgWyqJWKPV-zHQ
2020-10-24 18:00:21,311 INFO [org.apa.kaf.cli.con.int.AbstractCoordinator] (vert.x-kafka-consumer-thread-0) [Consumer clientId=consumer-d34f1529-406f-4f84-b7e1-235df4ca94da-1, groupId=d34f1529-406f-4f84-b7e1-235df4ca94da] Discovered group coordinator localhost:32993 (id: 2147483646 rack: null)
2020-10-24 18:00:21,313 INFO [org.apa.kaf.cli.con.int.AbstractCoordinator] (vert.x-kafka-consumer-thread-0) [Consumer clientId=consumer-d34f1529-406f-4f84-b7e1-235df4ca94da-1, groupId=d34f1529-406f-4f84-b7e1-235df4ca94da] (Re-)joining group
2020-10-24 18:00:21,348 INFO [org.apa.kaf.cli.con.int.AbstractCoordinator] (vert.x-kafka-consumer-thread-0) [Consumer clientId=consumer-d34f1529-406f-4f84-b7e1-235df4ca94da-1, groupId=d34f1529-406f-4f84-b7e1-235df4ca94da] Join group failed with org.apache.kafka.common.errors.MemberIdRequiredException: The group member needs to have a valid member id before actually entering a consumer group
2020-10-24 18:00:21,348 INFO [org.apa.kaf.cli.con.int.AbstractCoordinator] (vert.x-kafka-consumer-thread-0) [Consumer clientId=consumer-d34f1529-406f-4f84-b7e1-235df4ca94da-1, groupId=d34f1529-406f-4f84-b7e1-235df4ca94da] (Re-)joining group
2020-10-24 18:00:21,376 INFO [org.apa.kaf.cli.con.int.ConsumerCoordinator] (vert.x-kafka-consumer-thread-0) [Consumer clientId=consumer-d34f1529-406f-4f84-b7e1-235df4ca94da-1, groupId=d34f1529-406f-4f84-b7e1-235df4ca94da] Finished assignment for group at generation 1: {consumer-d34f1529-406f-4f84-b7e1-235df4ca94da-1-970f67b0-7968-4995-b2c7-2bba95daf5d7=Assignment(partitions=[platform-outcode-stats-0])}
2020-10-24 18:00:21,416 INFO [org.apa.kaf.cli.con.int.AbstractCoordinator] (vert.x-kafka-consumer-thread-0) [Consumer clientId=consumer-d34f1529-406f-4f84-b7e1-235df4ca94da-1, groupId=d34f1529-406f-4f84-b7e1-235df4ca94da] Successfully joined group with generation 1
2020-10-24 18:00:21,420 INFO [org.apa.kaf.cli.con.int.ConsumerCoordinator] (vert.x-kafka-consumer-thread-0) [Consumer clientId=consumer-d34f1529-406f-4f84-b7e1-235df4ca94da-1, groupId=d34f1529-406f-4f84-b7e1-235df4ca94da] Adding newly assigned partitions: platform-outcode-stats-0
2020-10-24 18:00:21,433 INFO [org.apa.kaf.cli.con.int.ConsumerCoordinator] (vert.x-kafka-consumer-thread-0) [Consumer clientId=consumer-d34f1529-406f-4f84-b7e1-235df4ca94da-1, groupId=d34f1529-406f-4f84-b7e1-235df4ca94da] Found no committed offset for partition platform-outcode-stats-0
2020-10-24 18:00:21,447 INFO [org.apa.kaf.cli.con.int.SubscriptionState] (vert.x-kafka-consumer-thread-0) [Consumer clientId=consumer-d34f1529-406f-4f84-b7e1-235df4ca94da-1, groupId=d34f1529-406f-4f84-b7e1-235df4ca94da] Resetting offset for partition platform-outcode-stats-0 to offset 0.
2020-10-24 18:03:36,158 INFO [org.apa.kaf.cli.con.int.ConsumerCoordinator] (vert.x-kafka-consumer-thread-0) [Consumer clientId=consumer-d34f1529-406f-4f84-b7e1-235df4ca94da-1, groupId=d34f1529-406f-4f84-b7e1-235df4ca94da] Revoke previously assigned partitions platform-outcode-stats-0
2020-10-24 18:03:36,158 INFO [org.apa.kaf.cli.con.int.AbstractCoordinator] (vert.x-kafka-consumer-thread-0) [Consumer clientId=consumer-d34f1529-406f-4f84-b7e1-235df4ca94da-1, groupId=d34f1529-406f-4f84-b7e1-235df4ca94da] Member consumer-d34f1529-406f-4f84-b7e1-235df4ca94da-1-970f67b0-7968-4995-b2c7-2bba95daf5d7 sending LeaveGroup request to coordinator localhost:32993 (id: 2147483646 rack: null) due to the consumer is being closed
2020-10-24 18:03:36,200 INFO [org.apa.kaf.cli.pro.KafkaProducer] (vert.x-worker-thread-1) [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
2020-10-24 18:03:36,227 INFO [io.sma.rea.mes.provider] (Quarkus Test Cleanup Shutdown task) SRMSG00207: Cancel subscriptions
2020-10-24 18:03:36,238 INFO [io.quarkus] (Quarkus Test Cleanup Shutdown task) Quarkus stopped in 0.100s
当然,我在这里遗漏了一些东西,或者我只是在做一些我不该做的事情。任何帮助都将不胜感激。
暂无答案!
目前还没有任何答案,快来回答吧!