RabbitMQ Java流客户端和RabbitMQ Kubernetes操作符

t9aqgxwy  于 2023-04-20  发布在  RabbitMQ
关注(0)|答案(1)|浏览(146)

我使用RabbitMQ Operator部署了一个Kubernetes集群,并激活了rabbitmq_stream插件。这是我的yaml:

apiVersion: rabbitmq.com/v1beta1
kind: RabbitmqCluster
metadata:
  name: rabbitmq-deployment
  namespace: rabbitmq-namespace
spec:
  replicas: 2
  image: rabbitmq:3.11.13
  persistence:
    storage: 20Gi
  service:
    type: LoadBalancer
  rabbitmq:
    additionalPlugins:
      - rabbitmq_stream
      - rabbitmq_stream_management

我还使用RabbitMQ Java流客户端,并像这样连接到集群:

EnvironmentBuilder environmentBuilder = Environment.builder();
environmentBuilder.host(System.getenv("RABBITMQ_HOST"));
environmentBuilder.port(Integer.parseInt(System.getenv("RABBITMQ_STREAM_PORT")));
environmentBuilder.username(System.getenv("RABBITMQ_USERNAME"));
environmentBuilder.password(System.getenv("RABBITMQ_PASSWORD"));
mainConnection = environmentBuilder.build();

现在,当我使用这个客户端创建流时,它的工作完美无瑕,没有错误报告:

mainConnection.streamCreator().stream("mystream").maxAge(Duration.of(1, ChronoUnit.DAYS)).create()

现在当我尝试生成这样的消息时:

Producer producer = RabbitMQStreamConnection.mainConnection.producerBuilder().stream("mystream").build();
byte[] messagePayload = "hello".getBytes(StandardCharsets.UTF_8);
producer.send(
    producer.messageBuilder().addData(messagePayload).build(),
    confirmationStatus -> {
        if (confirmationStatus.isConfirmed()) {
            // the message made it to the broker
        } else {
            // the message did not make it to the broker
        }
});

它抛出这个异常:

com.rabbitmq.stream.StreamException
Error while creating stream connection to rabbitmq-deployment-server-0.rabbitmq-deployment-nodes.rabbitmq-namespace:5552

当然,因为有两个节点(replicas = 2),流量似乎被直接重定向。
我想要的是,我可以从流中生产和消费消息。
现在,我不知道下一步该怎么做来解决这个问题。

chy5wohz

chy5wohz1#

您应该使用负载平衡器配置。
参见:https://rabbitmq.github.io/rabbitmq-stream-java-client/stable/htmlsingle/#when-a-load-balancer-is-in-use
当客户端试图连接到承载流领导者和副本的节点时,负载均衡器可能会误导客户端。“连接到流”博客文章介绍了为什么客户端应用程序必须连接到集群中的适当节点,以及负载均衡器如何使事情变得复杂。
EnvironmentBuilder#addressResolver(AddressResolver)方法允许在元数据提示之后和连接之前拦截节点解析。应用程序可以使用此钩子忽略元数据提示并始终使用负载均衡器,如以下片段所示:
使用自定义地址解析器以始终使用负载平衡器

Address entryPoint = new Address("my-load-balancer", 5552);  
Environment environment = Environment.builder()
    .host(entryPoint.host())  
    .port(entryPoint.port())  
    .addressResolver(address -> entryPoint)  
    .build();

相关问题