我创建了一个spring引导应用程序,它向kafka主题发送消息。我用的是Spring spring-integration-kafka
:一个 KafkaProducerMessageHandler<String,String>
订阅了一个频道( SubscribableChannel
)并将接收到的所有消息推送到一个主题。应用程序运行良好。我看到消息通过控制台消费者(本地kafka)到达kafka。
我还创建了一个使用 KafkaEmbedded
. 我通过订阅测试中的通道来检查预期的消息-一切正常。
但我希望测试也能检查输入Kafka的信息。可悲的是,Kafka的javadoc不是最好的。到目前为止我尝试的是:
@ClassRule
public static KafkaEmbedded kafkaEmbedded = new KafkaEmbedded(1, true, "myTopic");
//...
@Before
public void init() throws Exception {
mockConsumer = new MockConsumer<>( OffsetResetStrategy.EARLIEST );
kafkaEmbedded.consumeFromAnEmbeddedTopic( mockConsumer,"sikom" );
}
//...
@Test
public void endToEnd() throws Exception {
// ...
ConsumerRecords<String, String> records = mockConsumer.poll( 10000 );
StreamSupport.stream(records.spliterator(), false).forEach( record -> log.debug( "record: " + record.value() ) );
}
问题是我没有看到任何记录。我不确定我的Kafka床设置是否正确。但是信息是通过通道接收的。
1条答案
按热度按时间zsohkypk1#
这对我有用。试试看
我用倒计时锁是因为
Producer.Send(..)
是异步操作。所以我在这里做的是在一个无限循环中等待,每100毫秒轮询一次kafka,如果有新的记录,如果有,将它添加到一个列表中,以便将来Assert,然后减少倒计时。我总共会等10秒钟来确定。您也可以使用一个简单的循环,然后在几分钟后退出(如果您不希望使用countdownlatch和executorservice之类的东西)