无法在flink 1.3.2中启动rabbitmq源

2skhul33  于 2021-06-25  发布在  Flink
关注(0)|答案(1)|浏览(470)

我想先启动rabbitmq源,然后再启动sink,但我无法执行第一步,即启动rabbitmq源。rabbitmq服务器正在运行,我也可以看到 Jmeter 板。
我的代码如下
公共类rabbitmq\u源{

public static void main(String[] args) throws Exception {

    StreamExecutionEnvironment envrionment = StreamExecutionEnvironment.getExecutionEnvironment();

     RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
            .setHost("localhost")
            .setPort(50000).
                    setUserName("root")
             .setPassword("root").
                    setVirtualHost("/").build();

     DataStream<String> stream = envrionment
            .addSource(new RMQSource<String>(
                    connectionConfig,            // config for the RabbitMQ connection
                    "queue",                 // name of the RabbitMQ queue to consume
                    new SimpleStringSchema()));

    stream.print();

    envrionment.execute();

}

}
我不知道我应该设置什么用户名和密码,应该是吗 guest 以及 guest . 但是,我得到以下错误

java.lang.RuntimeException: Cannot create RMQ connection with queue at localhost
    at org.apache.flink.streaming.connectors.rabbitmq.RMQSource.open(RMQSource.java:172)
    at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:111)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.net.ConnectException: Connection refused (Connection refused)
    at java.net.PlainSocketImpl.socketConnect(Native Method)
    at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
    at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
    at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
    at java.net.Socket.connect(Socket.java:589)
    at com.rabbitmq.client.impl.FrameHandlerFactory.create(FrameHandlerFactory.java:32)
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:588)
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:612)
jfewjypa

jfewjypa1#

使用 LocalStreamEnvironment.createLocalEnvironment()guest 用户名和密码。

相关问题