我想为pack
和label
方法创建一个集成测试(pipe
composed),基于Spring Cloud Function。我知道配置的输入绑定正在等待来自order-accepted
exchange的order-accepted.dispatcher-service
队列的消息,而Spring Cloud Stream
会自动将消息发送到order-dispatched
交换机。所以我只需要将消息发送到order-accepted.dispatcher-service
队列,并检查从order-dispatched
交换机接收到的消息。但是我不知道routing key
,我如何才能将消息发送到order-accepted.dispatcher-service
队列?下面是我到目前为止的配置文件和测试类代码:
- application.yml:
server:
port: 9003
spring:
application:
name: dispatcher-service
cloud:
function:
definition: pack|label
stream:
bindings:
packlabel-in-0:
destination: order-accepted
group: ${spring.application.name}
packlabel-out-0:
destination: order-dispatched
rabbitmq:
host: localhost
port: 5672
username: user
password: password
connection-timeout: 5s
字符串
1.集成测试类(如何实现此集成测试代码?):
import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.test.context.DynamicPropertyRegistry;
import org.springframework.test.context.DynamicPropertySource;
import org.testcontainers.containers.RabbitMQContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;
import java.util.concurrent.TimeUnit;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@Testcontainers
class DispatcherServiceApplicationTests {
@Container
static RabbitMQContainer rabbitMQ = new RabbitMQContainer(DockerImageName.parse("rabbitmq:3.10-management"));
@DynamicPropertySource
static void rabbitMQProperties(DynamicPropertyRegistry registry){
registry.add("spring.rabbitmq.host", rabbitMQ::getHost);
registry.add("spring.rabbitmq.port", rabbitMQ::getAmqpPort);
registry.add("spring.rabbitmq.username", rabbitMQ::getAdminUsername);
registry.add("spring.rabbitmq.password", rabbitMQ::getAdminPassword);
}
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private RabbitAdmin rabbitAdmin;
@Test
void contextLoads() {
}
@Test
void packAndLabel(){
long orderId = 121;
Queue inputQueue = this.rabbitAdmin.declareQueue();
assert inputQueue != null;
Binding inputBinding = new Binding(inputQueue.getName(), Binding.DestinationType.QUEUE, "order-accepted", "#", null);
Queue outputQueue = this.rabbitAdmin.declareQueue();
assert outputQueue != null;
Binding outputBinding = new Binding(outputQueue.getName(), Binding.DestinationType.QUEUE, "order-dispatched", "#", null);
this.rabbitAdmin.declareBinding(inputBinding);
this.rabbitAdmin.declareBinding(outputBinding);
// I think this only send message to order-accepted exchange rather than order-accepted.dispatcher-service queue.
rabbitTemplate.convertAndSend("order-accepted", "#", new OrderAcceptedMessage(orderId));
await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> {
OrderDispatchedMessage message = rabbitTemplate.receiveAndConvert(outputQueue.getName(),
10000, new ParameterizedTypeReference<OrderDispatchedMessage>(){});
assert message != null;
assertThat(message.orderId()).isEqualTo(orderId);
System.out.println("------------------------------------: " + message.orderId());
});
}
}
型
1条答案
按热度按时间h7appiyu1#
我们自己使用测试容器,所以你可以看看我们在Rabbit中的一些测试。https://github.com/spring-cloud/spring-cloud-stream/blob/8f5e7d75f23f9463ecc4f1e8372685b1c01d97d4/binders/rabbit-binder/spring-cloud-stream-binder-rabbit/src/test/java/org/springframework/cloud/stream/binder/rabbit/integration/RabbitBinderModuleTests.java#L99