我的测试失败,因为缺少"subscriber“字段IllegalArgumentException: 'subscriber' must not be null in the "...endpoint"
@Test
public void test() throws InterruptedException {
ArgumentCaptor<Message<?>> captor = messageArgumentCaptor();
CountDownLatch receiveLatch = new CountDownLatch(1);
MessageHandler mockMessageHandler = mockMessageHandler(captor).handleNext(m -> receiveLatch.countDown());
this.mockIntegrationContext
.substituteMessageHandlerFor(
"test2.org.springframework.integration.config.ConsumerEndpointFactoryBean#1",
mockMessageHandler);
this.integrationFlowWithReactiveConsumerHandler.getInputChannel().send(new GenericMessage<>("test2"));
assertThat(receiveLatch.await(10, TimeUnit.SECONDS)).isTrue();
verify(mockMessageHandler).handleMessage(any());
assertThat(captor.getValue().getPayload())
.isEqualTo("reactive-message-text");
}
当端点为ReactiveStreamsConsumer
时,在MockIntegrationContext.java
中调用substituteMessageHandlerFor
方法失败
public void substituteMessageHandlerFor(String consumerEndpointId, // NOSONAR - complexity
MessageHandler mockMessageHandler, boolean autoStartup) {
Object endpoint = this.beanFactory.getBean(consumerEndpointId, IntegrationConsumer.class);
if (autoStartup && endpoint instanceof Lifecycle) {
((Lifecycle) endpoint).stop();
}
DirectFieldAccessor directFieldAccessor = new DirectFieldAccessor(endpoint);
Object targetMessageHandler = directFieldAccessor.getPropertyValue(HANDLER);
Assert.notNull(targetMessageHandler, () -> "'handler' must not be null in the: " + endpoint);
if (endpoint instanceof ReactiveStreamsConsumer) {
Object targetSubscriber = directFieldAccessor.getPropertyValue("subscriber");
Assert.notNull(targetSubscriber, () -> "'subscriber' must not be null in the: " + endpoint);
处理程序端点bean是...:reactive-outbound-channel-adapter
,它是使用ReactiveStreamConsumer构造函数示例化的,其中“subsciber”字段默认为空。
/**
* Instantiate an endpoint based on the provided {@link MessageChannel} and {@link ReactiveMessageHandler}.
* @param inputChannel the channel to consume in reactive manner.
* @param reactiveMessageHandler the {@link ReactiveMessageHandler} to process messages.
* @since 5.3
*/
public ReactiveStreamsConsumer(MessageChannel inputChannel, ReactiveMessageHandler reactiveMessageHandler) {
Assert.notNull(inputChannel, "'inputChannel' must not be null");
this.inputChannel = inputChannel;
this.handler = new ReactiveMessageHandlerAdapter(reactiveMessageHandler);
this.reactiveMessageHandler = reactiveMessageHandler;
this.publisher = IntegrationReactiveUtils.messageChannelToFlux(inputChannel);
this.subscriber = null;
this.lifecycleDelegate =
reactiveMessageHandler instanceof Lifecycle ? (Lifecycle) reactiveMessageHandler : null;
}
在测试期间,它创建端点bean,然后在substituteMessageHandlerFor
期间,它抛出缺少subscriber
字段的异常
集成流程很简单,只有一个DBReact式处理程序。
有什么主意吗?谢谢。
1条答案
按热度按时间b1zrtrql1#
这是测试框架中的bug,当我们在
ReactiveStreamsConsumer
中引入一个ReactiveMessageHandler
支持时,正好错过了分别调整MockIntegrationContext
逻辑。目前还没有办法模拟React性端点作为任何合理的解决方案。但是,您可以将一些中间端点引入到流中,例如
bridge()
,并只模拟这个端点而不返回任何结果。因此,您的测试将通过,最终不会向真实的的React性端点发送任何内容。请随时提出生长激素的问题,我们将尽快调查。