我试图创建一个集成测试与PubSub模拟器的基础上的例子从这个GitHub仓库看起来像
@SpringBootTest
@Testcontainers
@ActiveProfiles("test")
public class PubSubIntegrationTests {
private static final String PROJECT_ID = "test-project";
@Container
private static final PubSubEmulatorContainer pubsubEmulator =
new PubSubEmulatorContainer(
DockerImageName.parse("gcr.io/google.com/cloudsdktool/cloud-sdk:367.0.0-emulators"));
@DynamicPropertySource
static void emulatorProperties(DynamicPropertyRegistry registry) {
registry.add("spring.cloud.gcp.pubsub.emulator-host", pubsubEmulator::getEmulatorEndpoint);
}
@BeforeAll
static void setup() throws Exception {
ManagedChannel channel =
ManagedChannelBuilder.forTarget("dns:///" + pubsubEmulator.getEmulatorEndpoint())
.usePlaintext()
.build();
TransportChannelProvider channelProvider =
FixedTransportChannelProvider.create(GrpcTransportChannel.create(channel));
TopicAdminClient topicAdminClient =
TopicAdminClient.create(
TopicAdminSettings.newBuilder()
.setCredentialsProvider(NoCredentialsProvider.create())
.setTransportChannelProvider(channelProvider)
.build());
SubscriptionAdminClient subscriptionAdminClient =
SubscriptionAdminClient.create(
SubscriptionAdminSettings.newBuilder()
.setTransportChannelProvider(channelProvider)
.setCredentialsProvider(NoCredentialsProvider.create())
.build());
PubSubAdmin admin =
new PubSubAdmin(() -> PROJECT_ID, topicAdminClient, subscriptionAdminClient);
admin.createTopic("test-topic");
admin.createSubscription("test-subscription", "test-topic");
admin.close();
channel.shutdown();
}
// By default, autoconfiguration will initialize application default credentials.
// For testing purposes, don't use any credentials. Bootstrap w/ NoCredentialsProvider.
@TestConfiguration
static class PubSubEmulatorConfiguration {
@Bean
CredentialsProvider googleCredentials() {
return NoCredentialsProvider.create();
}
}
@Autowired PubSubSender sender;
@Autowired PubSubSubscriberTemplate subscriberTemplate;
@Autowired PubSubPublisherTemplate publisherTemplate;
@Test
void testSend() throws ExecutionException, InterruptedException {
ListenableFuture<String> future = sender.send("hello!");
List<AcknowledgeablePubsubMessage> msgs =
await().until(() -> subscriberTemplate.pull("test-subscription", 10, true), not(empty()));
assertEquals(1, msgs.size());
assertEquals(future.get(), msgs.get(0).getPubsubMessage().getMessageId());
assertEquals("hello!", msgs.get(0).getPubsubMessage().getData().toStringUtf8());
for (AcknowledgeablePubsubMessage msg : msgs) {
msg.ack();
}
}
@Test
void testWorker() throws ExecutionException, InterruptedException {
ListenableFuture<String> future = publisherTemplate.publish("test-topic", "hi!");
List<PubsubMessage> messages = Collections.synchronizedList(new LinkedList<>());
PubSubWorker worker =
new PubSubWorker(
"test-subscription",
subscriberTemplate,
(msg) -> {
messages.add(msg);
});
worker.start();
await().until(() -> messages, not(empty()));
assertEquals(1, messages.size());
assertEquals(future.get(), messages.get(0).getMessageId());
assertEquals("hi!", messages.get(0).getData().toStringUtf8());
worker.stop();
}
@AfterEach
void teardown() {
// Drain any messages that are still in the subscription so that they don't interfere with
// subsequent tests.
await().until(() -> subscriberTemplate.pullAndAck("test-subscription", 1000, true), hasSize(0));
}
}
对于上面的例子,一切都很好,但是当我想测试我的实现时,如下所示
@Autowired
private FunctionCatalog catalog;
@Test
void testSendB() throws ExecutionException, InterruptedException {
Consumer<PubSubMessage> function = catalog.lookup(MyFunction.class, FUNCTION_DEFINITION);
var pubSubMessage = new PubSubMessage();
pubSubMessage.setData(Base64.getEncoder().encodeToString(EMPTY_MESSAGE.getBytes()));
function.accept(pubSubMessage);
List<AcknowledgeablePubsubMessage> msgs =
await().until(() -> subscriberTemplate.pull("test-subscription", 10, true), not(empty()));
assertEquals(1, msgs.size());
assertEquals(future.get(), msgs.get(0).getPubsubMessage().getMessageId());
assertEquals("hello!", msgs.get(0).getPubsubMessage().getData().toStringUtf8());
for (AcknowledgeablePubsubMessage msg : msgs) {
msg.ack();
}
}
它将抛出错误:
java.lang.RuntimeException: java.util.concurrent.ExecutionException: com.google.api.gax.rpc.NotFoundException: io.grpc.StatusRuntimeException: NOT_FOUND: Resource not found (resource=test-topic).
其中我的服务实现使用Publisher
而不是示例中的PubSubPublisherTemplate
:
private final Publisher publisher;
public void publishMessage(String message) {
var byteStr = ByteString.copyFrom(message, StandardCharsets.UTF_8);
var pubsubApiMessage = getPubsubApiMessage(byteStr);
try {
publish(pubsubApiMessage);
} catch (Exception e) {
log.error("Error during event publishing: " + e.getMessage(), e);
throw new RuntimeException(e);
}
}
private void publish(PubsubMessage pubsubApiMessage) throws Exception {
publisher.publish(pubsubApiMessage).get();
}
private PubsubMessage getPubsubApiMessage(ByteString byteStr) {
return PubsubMessage.newBuilder()
.setData(byteStr)
.build();
}
并且在部署到GCP时工作正常,但在使用PubSub仿真器进行集成测试的情况下则不然。
1条答案
按热度按时间5w9g7ksd1#
PubSub模拟器需要自己的测试发布器,可以在配置中创建为bean。示例: