akka 阿尔帕克卡AMQP:如何检测申报异常?

i7uq4tfw  于 2022-11-06  发布在  其他
关注(0)|答案(1)|浏览(163)

我有一个带有声明的AMQP源和AMQP接收器:

List<Declaration> declarations = new ArrayList<Declaration>() {{
      add(QueueDeclaration.create(sourceExchangeName));
      add(BindingDeclaration.create(sourceExchangeName, sourceExchangeName).withRoutingKey(sourceRoutingKey));
    }};

amqpSource = AmqpSource
    .committableSource(
        NamedQueueSourceSettings.create(connectionProvider, sourceExchangeName)
            .withDeclarations(declarations),
        bufferSize);

AmqpWriteSettings amqpWriteSettings = AmqpWriteSettings.create(connectionProvider)
    .withExchange("DEST_XCHANGE")
    .withRoutingKey("ROUTE123")
    .withDeclaration(ExchangeDeclaration.create(destinationExchangeName,
        BuiltinExchangeType.DIRECT.getType()));

amqpSink = AmqpSink.create(amqpWriteSettings);

然后我有一个流..

amqpSource.map(doSomething).async().map(doSomethingElse).async().to(amqpSink)

现在,在我启动应用程序后,发送到源队列的消息没有被使用。我后来发现这是由于声明期间发生的错误。(即,当我删除源和接收器设置中的.withDeclarations(..)时,它工作正常。
所以我的问题:
1.如何检测AMQP源和接收器是否正常启动和运行?
1.如何忽略声明异常?
1.如果出现任何异常,我如何知道并使系统失效?

bnlyeluc

bnlyeluc1#

为了回答1和3,AmqpSink具体化了一个CompletionStage<Done>,您必须保留并处理(注册一些回调函数)该x1m1 n1,以观察流的失败和完成。在docs示例中,我们在完成阶段阻塞,这在生产代码中是不好的(https://doc.akka.io/docs/alpakka/current/amqp.html#with-sink),这可能是因为该示例包含在Alpakka测试之一中。首选常用的CompletionStage回调/转换方法(参见例如this introduction)。
CompletionStage会在错误发生时失败,当流被实体化/启动时,或者在元素处理过程中,或者在源到达末尾并且每个元素都通过流进入接收器时完成。这意味着对于启动流,如果它没有很快失败,它就在运行。
对于问题2,不确定是否可以忽略声明异常,原因可能是这些异常总是导致连接失败。

相关问题