java 使用Testcontainer PubSub模拟器测试GCP PubSub

zdwk9cvp  于 2023-05-27  发布在  Java
关注(0)|答案(1)|浏览(182)

我试图创建一个集成测试与PubSub模拟器的基础上的例子从这个GitHub仓库看起来像

@SpringBootTest
@Testcontainers
@ActiveProfiles("test")
public class PubSubIntegrationTests {
  private static final String PROJECT_ID = "test-project";

  @Container
  private static final PubSubEmulatorContainer pubsubEmulator =
      new PubSubEmulatorContainer(
          DockerImageName.parse("gcr.io/google.com/cloudsdktool/cloud-sdk:367.0.0-emulators"));

  @DynamicPropertySource
  static void emulatorProperties(DynamicPropertyRegistry registry) {
    registry.add("spring.cloud.gcp.pubsub.emulator-host", pubsubEmulator::getEmulatorEndpoint);
  }

  @BeforeAll
  static void setup() throws Exception {
    ManagedChannel channel =
        ManagedChannelBuilder.forTarget("dns:///" + pubsubEmulator.getEmulatorEndpoint())
            .usePlaintext()
            .build();
    TransportChannelProvider channelProvider =
        FixedTransportChannelProvider.create(GrpcTransportChannel.create(channel));

    TopicAdminClient topicAdminClient =
        TopicAdminClient.create(
            TopicAdminSettings.newBuilder()
                .setCredentialsProvider(NoCredentialsProvider.create())
                .setTransportChannelProvider(channelProvider)
                .build());

    SubscriptionAdminClient subscriptionAdminClient =
        SubscriptionAdminClient.create(
            SubscriptionAdminSettings.newBuilder()
                .setTransportChannelProvider(channelProvider)
                .setCredentialsProvider(NoCredentialsProvider.create())
                .build());

    PubSubAdmin admin =
        new PubSubAdmin(() -> PROJECT_ID, topicAdminClient, subscriptionAdminClient);

    admin.createTopic("test-topic");
    admin.createSubscription("test-subscription", "test-topic");

    admin.close();
    channel.shutdown();
  }

  // By default, autoconfiguration will initialize application default credentials.
  // For testing purposes, don't use any credentials. Bootstrap w/ NoCredentialsProvider.
  @TestConfiguration
  static class PubSubEmulatorConfiguration {
    @Bean
    CredentialsProvider googleCredentials() {
      return NoCredentialsProvider.create();
    }
  }

  @Autowired PubSubSender sender;

  @Autowired PubSubSubscriberTemplate subscriberTemplate;
  @Autowired PubSubPublisherTemplate publisherTemplate;

  @Test
  void testSend() throws ExecutionException, InterruptedException {
    ListenableFuture<String> future = sender.send("hello!");

    List<AcknowledgeablePubsubMessage> msgs =
        await().until(() -> subscriberTemplate.pull("test-subscription", 10, true), not(empty()));

    assertEquals(1, msgs.size());
    assertEquals(future.get(), msgs.get(0).getPubsubMessage().getMessageId());
    assertEquals("hello!", msgs.get(0).getPubsubMessage().getData().toStringUtf8());

    for (AcknowledgeablePubsubMessage msg : msgs) {
      msg.ack();
    }
  }

  @Test
  void testWorker() throws ExecutionException, InterruptedException {
    ListenableFuture<String> future = publisherTemplate.publish("test-topic", "hi!");

    List<PubsubMessage> messages = Collections.synchronizedList(new LinkedList<>());
    PubSubWorker worker =
        new PubSubWorker(
            "test-subscription",
            subscriberTemplate,
            (msg) -> {
              messages.add(msg);
            });
    worker.start();

    await().until(() -> messages, not(empty()));
    assertEquals(1, messages.size());
    assertEquals(future.get(), messages.get(0).getMessageId());
    assertEquals("hi!", messages.get(0).getData().toStringUtf8());

    worker.stop();
  }

  @AfterEach
  void teardown() {
    // Drain any messages that are still in the subscription so that they don't interfere with
    // subsequent tests.
    await().until(() -> subscriberTemplate.pullAndAck("test-subscription", 1000, true), hasSize(0));
  }
}

对于上面的例子,一切都很好,但是当我想测试我的实现时,如下所示

@Autowired
    private FunctionCatalog catalog;

    @Test
    void testSendB() throws ExecutionException, InterruptedException {
        Consumer<PubSubMessage> function = catalog.lookup(MyFunction.class, FUNCTION_DEFINITION);
        var pubSubMessage = new PubSubMessage();
        pubSubMessage.setData(Base64.getEncoder().encodeToString(EMPTY_MESSAGE.getBytes()));
        function.accept(pubSubMessage);

        List<AcknowledgeablePubsubMessage> msgs =
                await().until(() -> subscriberTemplate.pull("test-subscription", 10, true), not(empty()));

        assertEquals(1, msgs.size());
        assertEquals(future.get(), msgs.get(0).getPubsubMessage().getMessageId());
        assertEquals("hello!", msgs.get(0).getPubsubMessage().getData().toStringUtf8());

        for (AcknowledgeablePubsubMessage msg : msgs) {
            msg.ack();
        }
    }

它将抛出错误:

java.lang.RuntimeException: java.util.concurrent.ExecutionException: com.google.api.gax.rpc.NotFoundException: io.grpc.StatusRuntimeException: NOT_FOUND: Resource not found (resource=test-topic).

其中我的服务实现使用Publisher而不是示例中的PubSubPublisherTemplate

private final Publisher publisher;

    public void publishMessage(String message) {
        var byteStr = ByteString.copyFrom(message, StandardCharsets.UTF_8);
        var pubsubApiMessage = getPubsubApiMessage(byteStr);

        try {
            publish(pubsubApiMessage);
        } catch (Exception e) {
            log.error("Error during event publishing: " + e.getMessage(), e);
            throw new RuntimeException(e);
        }
    }

    private void publish(PubsubMessage pubsubApiMessage) throws Exception {
        publisher.publish(pubsubApiMessage).get();
    }

    private PubsubMessage getPubsubApiMessage(ByteString byteStr) {
        return PubsubMessage.newBuilder()
                            .setData(byteStr)
                            .build();
    }

并且在部署到GCP时工作正常,但在使用PubSub仿真器进行集成测试的情况下则不然。

5w9g7ksd

5w9g7ksd1#

PubSub模拟器需要自己的测试发布器,可以在配置中创建为bean。示例:

@Configuration
public class PubSubConfig {
   @Value("${gcp.pubsub.topic.name}")
   private String topicName;

   @Value("${gcp.project.id}")
   private String projected;
   
   @Value("${spring.cloud.gcp.pubsub.emulator-host}")
   private String host;
   
   private static final CredentialsProvider CREDENTIALS_PROVIDER = NoCredentialsProvider.create();

   @Bean
   public SubscriberStub testSubscriber(
         FixedTransportChannelProvider fixedTransportChannelProvider) throws IOException {
      return GrpcSubscriberStub.create(SubscriberStubSettings.newBuilder()
                                                             .setTransportChannelProvider(fixedTransportChannelProvider)
                                                             .setCredentialsProvider(CREDENTIALS_PROVIDER)
                                                             .build());
   }

   @Primary
   @Bean
   public Publisher testPublisher(FixedTransportChannelProvider fixedTransportChannelProvider) throws IOException {
      return Publisher.newBuilder(ProjectTopicName.of(projectId, topicName))
                      .setChannelProvider(fixedTransportChannelProvider)
                      .setCredentialsProvider(NoCredentialsProvider.create())
                      .build();
   }

   @Bean
   public TopicAdminClient getTopicAdminClient(
         FixedTransportChannelProvider fixedTransportChannelProvider) throws IOException {
      return TopicAdminClient.create(TopicAdminSettings.newBuilder()
                                                       .setTransportChannelProvider(fixedTransportChannelProvider)
                                                       .setCredentialsProvider(CREDENTIALS_PROVIDER)
                                                       .build());
   }

   @Primary
   @Bean
   public FixedTransportChannelProvider getChannelProvider() {
      var channel = ManagedChannelBuilder.forTarget(host)
                                         .usePlaintext()
                                         .build();
      return FixedTransportChannelProvider.create(GrpcTransportChannel.create(channel));
   }

   @Bean
   public SubscriptionAdminClient createSubscriptionAdmin(FixedTransportChannelProvider fixedTransportChannelProvider) throws IOException {
      return SubscriptionAdminClient.create(SubscriptionAdminSettings.newBuilder()
                                                                     .setCredentialsProvider(
                                                                           NoCredentialsProvider.create())
                                                                     .setTransportChannelProvider(
                                                                           fixedTransportChannelProvider)
                                                                     .build());
   }

}

相关问题