RabbitMQ连接工厂中的管道损坏,newConnection()

xmakbtuz  于 2022-11-08  发布在  RabbitMQ
关注(0)|答案(2)|浏览(191)

很少情况下,当负载比平时更大时,RabbitMQ应用程序开始返回SocketException:管道中断(基本上不处理任何进一步的消息)。
系统使用RPC模式,工作线程侦听一些预定义的队列以查找作业,客户端提交这些作业上的任务,同时打开一个临时的自动删除队列,客户端将该队列指定为replyTo队列,客户端在该队列中侦听回复(并使用相关性ID来匹配消息)。
实际上导致Broken pipe的代码非常简单,它位于客户端部分,基本上是:

factory = new ConnectionFactory();
factory.setUri(uri);
connection = factory.newConnection(); // this is when we get the exception

例外情况如下:

2013-09-06 21:37:03,947 +0000 [http-bio-8080-exec-350] ERROR RabbitRpcClient:79  - IOException 
java.net.SocketException: Broken pipe
    at java.net.SocketOutputStream.socketWrite0(Native Method)
    at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109)
    at java.net.SocketOutputStream.write(SocketOutputStream.java:153)
    at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
    at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
    at java.io.DataOutputStream.flush(DataOutputStream.java:123)
    at com.rabbitmq.client.impl.SocketFrameHandler.flush(SocketFrameHandler.java:142)
    at com.rabbitmq.client.impl.AMQConnection.flush(AMQConnection.java:488)
    at com.rabbitmq.client.impl.AMQCommand.transmit(AMQCommand.java:125)
    at com.rabbitmq.client.impl.AMQChannel.quiescingTransmit(AMQChannel.java:316)
    at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:292)
    at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:285)
    at com.rabbitmq.client.impl.AMQConnection.start(AMQConnection.java:383)
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:516)
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:533)     
    ...

我认为这通常与工作人员比平时花更长的时间处理业务相吻合,因此会同时打开更多的临时客户端队列(大约20-30个?),但据我所知,我没有遇到任何常见的水印(内存、磁盘-我可能遇到了一些我不知道的限制)。
我已经审查了Rabbit日志,我发现的唯一错误类型是:

=ERROR REPORT==== 6-Sep-2013::21:36:59 ===
closing AMQP connection <0.3105.1297> (10.118.69.132:42582 -> 10.12.111.134:5672):
{handshake_timeout,frame_header}

我查看了两个日志,客户端第一次出现“断管”是在21:37:03,而RabbitMQ日志中第一次出现任何类型的ERROR是在21:36:59,之后定期出现相同类型的常规错误,直到系统重启,所以我相信发布的是相应的日志条目。
我使用的是Rabbit Java客户端3.1.4(Maven Central上的最新版本),其中Rabbit服务器3.1.4运行在AWS EC2上的Amazon Linux上。
以下是正常情况下的rabbitmqctl状态(不幸的是,不是在故障期间,我将尝试在下次出现时获取一个):

Status of node 'rabbit@ip-some-ip' ...
[{pid,2654},
 {running_applications,
     [{rabbitmq_management,"RabbitMQ Management Console","3.1.4"},
  {rabbitmq_management_agent,"RabbitMQ Management Agent","3.1.4"},
  {rabbit,"RabbitMQ","3.1.4"},
  {os_mon,"CPO  CXC 138 46","2.2.7"},
  {rabbitmq_web_dispatch,"RabbitMQ Web Dispatcher","3.1.4"},
  {webmachine,"webmachine","1.10.3-rmq3.1.4-gite9359c7"},
  {mochiweb,"MochiMedia Web Server","2.7.0-rmq3.1.4-git680dba8"},
  {xmerl,"XML parser","1.2.10"},
  {inets,"INETS  CXC 138 49","5.7.1"},
  {mnesia,"MNESIA  CXC 138 12","4.5"},
  {amqp_client,"RabbitMQ AMQP Client","3.1.4"},
  {sasl,"SASL  CXC 138 11","2.1.10"},
  {stdlib,"ERTS  CXC 138 10","1.17.5"},
  {kernel,"ERTS  CXC 138 10","2.14.5"}]},
 {os,{unix,linux}},
 {erlang_version,
 "Erlang R14B04 (erts-5.8.5) [source] [64-bit] [smp:2:2] [rq:2]     [async-threads:30] [kernel-poll:true]\n"},
{memory,
 [{total,331967824},
  {connection_procs,5389784},
  {queue_procs,2669016},
  {plugins,654768},
  {other_proc,10063336},
  {mnesia,90352},
  {mgmt_db,2706344},
  {msg_index,7148168},
  {other_ets,3495648},
  {binary,1952040},
  {code,17696200},
  {atom,1567425},
  {other_system,278534743}]},
{vm_memory_high_watermark,0.4},
{vm_memory_limit,3126832332},
{disk_free_limit,1000000000},
{disk_free,1487147008},
{file_descriptors,
 [{total_limit,349900},
  {total_used,71},
  {sockets_limit,314908},
  {sockets_used,66}]},
{processes,[{limit,1048576},{used,930}]},
{run_queue,0},
 {uptime,5680}]
 ...done.

有什么想法可能是错误的,或者至少我可以做些什么来调试这个/更清楚地了解正在发生的事情?

n6lpvg4x

n6lpvg4x1#

我已经更改了代码以重用Connection对象-实际上甚至在多个线程中也是这样做的,而且看起来这个问题不再重复出现(祈祷吧)。

x9ybnkn6

x9ybnkn62#

package com.rm.rabbitmq.tls;

import java.io.*;
import java.security.*;
import javax.net.ssl.*;

import com.rabbitmq.client.*;

public class Example2 {

    public static void main(String[] args) throws Exception {

        char[] keyPassphrase = "".toCharArray();
        KeyStore ks = KeyStore.getInstance("PKCS12");
        ks.load(new FileInputStream("/Users/global/Documents/tls-gen/basic/result/client_key.p12"), keyPassphrase);

        KeyManagerFactory kmf = KeyManagerFactory.getInstance("SunX509");
        kmf.init(ks, keyPassphrase);

        char[] trustPassphrase = "welcome".toCharArray();
        KeyStore tks = KeyStore.getInstance("JKS");
        tks.load(new FileInputStream("/Users/global/Documents/tls-gen/basic/result/rabbitstore"), trustPassphrase);

        TrustManagerFactory tmf = TrustManagerFactory.getInstance("SunX509");
        tmf.init(tks);

        SSLContext c = SSLContext.getInstance("TLSv1.3");
        c.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null);

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5671);
        factory.useSslProtocol(c);
        //factory.enableHostnameVerification();

        Connection conn = factory.newConnection();
        Channel channel = conn.createChannel();

        channel.queueDeclare("rabbitmq-java-test", false, true, true, null);
        channel.basicPublish("", "rabbitmq-java-test", null, "Hello, World".getBytes());

        GetResponse chResponse = channel.basicGet("rabbitmq-java-test", false);
        if (chResponse == null) {
            System.out.println("No message retrieved");
        } else {
            byte[] body = chResponse.getBody();
            System.out.println("Received: " + new String(body));
        }

        channel.close();
        conn.close();
    }
}

相关问题