Camel -当使用目录不存在时停止路由

htrmnn0y  于 2022-11-07  发布在  Apache
关注(0)|答案(4)|浏览(187)

我有一个SFTP路由(在Spring XML中),它的from路径结束于一个每天变化的目录(即/yyyyMMdd),当autoCreate=true或路由开始时目录存在时一切都很好。但如果目录不存在,则不允许我创建目录!
当目录存在时,路由会获取文件并自行终止。
当dir不存在时,路由将永久轮询并发出警告(即org.apache.camel.component.file.GenericFileOperationFailedException: Cannot change directory to: 20160917),并且永不停止。
我如何避免这种行为(例如,将警告转换为空消息或异常或...)?我已经做了一些实验,使用startingDirectoryMustExist,consumer.bridgeErrorHandler和许多其他方法,但都没有成功。
简化的路由(在开始之前,用实际日期填充elmu.sftp.importDir属性):

<from
        uri="sftp://{{elmu.sftp.host}}:{{elmu.sftp.port}}{{elmu.sftp.importDir}}?username={{elmu.sftp.userName}}&amp;password={{elmu.sftp.password}}&amp;
        autoCreate=false&amp;preferredAuthentications=password&amp;binary=true&amp;include={{elmu.importMask}}&amp;initialDelay=100&amp;
        noop=true&amp;sortBy=file:name&amp;sendEmptyMessageWhenIdle=true"/>
    <choice>
        <when>
            <simple>${body} != null</simple>
            ... a lot of stuff ...
            <to uri="bean:shutdownRoute" />
        </when>
        <otherwise>
            <to uri="bean:shutdownRoute" />
        </otherwise>
    </choice>

如果使用directoryMustExist=true and startingDirectoryMustExist=true,则结果是一个无限循环(轮询),并显示以下警告:

08:30:14,658 WARN  SftpConsumer - Consumer Consumer[sftp://xxx.xxx.xx:22/DBHtest/ELMUteszt/Kiadott_adatok/20160918?autoCreate=false&binary=true&directoryMustExist=true&include=%5E.*%24&initialDelay=100&noop=true&password=xxxxxx&preferredAuthentications=password&sendEmptyMessageWhenIdle=true&sortBy=file%3Aname&startingDirectoryMustExist=true&username=xxx] failed polling endpoint: Endpoint[sftp://xxx:22/DBHtest/ELMUteszt/Kiadott_adatok/20160918?autoCreate=false&binary=true&directoryMustExist=true&include=%5E.*%24&initialDelay=100&noop=true&password=xxxxxx&preferredAuthentications=password&sendEmptyMessageWhenIdle=true&sortBy=file%3Aname&startingDirectoryMustExist=true&username=xxx]. Will try again at next poll. Caused by: [org.apache.camel.component.file.GenericFileOperationFailedException - Cannot change directory to: 20160918] 
org.apache.camel.component.file.GenericFileOperationFailedException: Cannot change directory to: 20160918
    at org.apache.camel.component.file.remote.SftpOperations.doChangeDirectory(SftpOperations.java:576)
    at org.apache.camel.component.file.remote.SftpOperations.changeCurrentDirectory(SftpOperations.java:564)
    at org.apache.camel.component.file.remote.SftpConsumer.doPollDirectory(SftpConsumer.java:107)
    at org.apache.camel.component.file.remote.SftpConsumer.pollDirectory(SftpConsumer.java:79)
    at org.apache.camel.component.file.GenericFileConsumer.poll(GenericFileConsumer.java:131)
    at org.apache.camel.impl.ScheduledPollConsumer.doRun(ScheduledPollConsumer.java:175)
    at org.apache.camel.impl.ScheduledPollConsumer.run(ScheduledPollConsumer.java:102)
    at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
    at java.util.concurrent.FutureTask.runAndReset(Unknown Source)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(Unknown Source)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)
Caused by: 2: No such file
    at com.jcraft.jsch.ChannelSftp.throwStatusError(ChannelSftp.java:2846)
    at com.jcraft.jsch.ChannelSftp._realpath(ChannelSftp.java:2340)
    at com.jcraft.jsch.ChannelSftp.cd(ChannelSftp.java:342)
    at org.apache.camel.component.file.remote.SftpOperations.doChangeDirectory(SftpOperations.java:574)
    ... 13 more

它不适用于stepwise=false

11:52:19,210 WARN  SftpConsumer - Consumer Consumer[sftp://xxx:22/DBHtest/ELMUteszt/Kiadott_adatok/20160918?autoCreate=false&binary=true&directoryMustExist=true&include=%5E.*%24&initialDelay=100&noop=true&password=xxxxxx&preferredAuthentications=password&sendEmptyMessageWhenIdle=true&sortBy=file%3Aname&startingDirectoryMustExist=true&stepwise=false&username=xxx] failed polling endpoint: Endpoint[sftp://xxx:22/DBHtest/ELMUteszt/Kiadott_adatok/20160918?autoCreate=false&binary=true&directoryMustExist=true&include=%5E.*%24&initialDelay=100&noop=true&password=xxxxxx&preferredAuthentications=password&sendEmptyMessageWhenIdle=true&sortBy=file%3Aname&startingDirectoryMustExist=true&stepwise=false&username=xxx]. Will try again at next poll. Caused by: [org.apache.camel.component.file.GenericFileOperationFailedException - Cannot list directory: DBHtest/ELMUteszt/Kiadott_adatok/20160918] 
org.apache.camel.component.file.GenericFileOperationFailedException: Cannot list directory: DBHtest/ELMUteszt/Kiadott_adatok/20160918

更新(根据@ruffp的回答):
我试着设置了一个自定义的PollingConsumerPollStrategy,但是我无法从它那里停止路由。只有第三行(注解行)停止了路由,但是我有几个路由,我不知道实际路由的名称。我该如何获得它?

@Override
    public boolean rollback(Consumer consumer, Endpoint endpoint, int retryCounter, Exception cause) throws Exception {
        consumer.getEndpoint().stop(); // 1
        consumer.stop();  // 2
        consumer.getEndpoint().getCamelContext().stopRoute(route???);  // 3
        return false;
    }
1tu0hz3e

1tu0hz3e1#

最后我用consumer.exceptionHandler解决了这个问题。但是看起来,这个选项不在可用选项列表中(我已经反复阅读了http://camel.apache.org/file2.html),只是在巨大页面的底部有一个例子提到过一次。不幸的是,它是如此的“隐藏”,以至于我到现在都没有看到它。
我设置了一个新类并实现了handleExceptions方法:

public class DirNotExistsExHandler implements ExceptionHandler

它获取异常并决定要做什么。在上下文中,我定义了一个bean:

<bean id="dirNotExistsExHandler" class="hu.dbit.eleo.DirNotExistsExHandler" />

在使用者中,将bean传递给处理程序:

consumer.exceptionHandler=#dirNotExistsExHandler

非常感谢您的帮助!

fkvaft9z

fkvaft9z2#

在我看来,最好的办法是:
1.在sftp端点上启用throwExceptionOnConnectFailed选项,如下所示:

sftp://{{elmu.sftp.host}}:{{elmu.sftp.port}}{{elmu.sftp.importDir}}?username={{elmu.sftp.userName}}&amp;password={{elmu.sftp.password}}&amp;
autoCreate=false&amp;preferredAuthentications=password&amp;binary=true&amp;include={{elmu.importMask}}&amp;initialDelay=100&amp;
noop=true&amp;sortBy=file:name&amp;sendEmptyMessageWhenIdle=true&amp;throwExceptionOnConnectFailed=true

这可以帮助管理异常,并通过 Camel 路径onException捕获它。但是,我不确定在您的情况下是否真的有必要。
1.对连接异常进行特殊处理

// 2a) Solution to redirect an empty body to a destination
onException(GenericFileOperationFailedException.class)
            .handled(true)
            .log(LoggingLevel.INFO, "Source directory not present: send empty body to shutdown route...")
            .setBody(null)
            .to("bean:shutdownRoute");

或者换个方式:

// 2b)Solution to just stop the processing without log in warn or error
onException(GenericFileOperationFailedException.class)
    .handled(true)
    .log(LoggingLevel.INFO, "Source directory not present: stop the process and wait until next time...")
    .stop();

更新:我找到了this post,显然除了实现您自己的PollingConsumerPollStrategy之外没有其他方法,因为GenericFileOperationFailedException显然是在默认实现中处理的。

baubqpgj

baubqpgj3#

使用doCatch区块行程例外状况。如果没有掷回例外状况,则使用您的程式码检查档案是否存在,然后手动掷回例外状况。
http://camel.apache.org/try-catch-finally.html

6ju8rftf

6ju8rftf4#

奥托·乔塔里:要在回滚方法中获取失败路由的路由ID,请使用以下代码片段

CamelContext context = endpoint.getCamelContext();
    List<Route> routes = context.getRoutes();
    SftpEndpoint sftpEndpoint = (SftpEndpoint)endpoint;
    Route failedRoute =  routes.stream().filter(route -> ((SftpEndpoint) route.getEndpoint())
            .getConfiguration().getDirectoryName().equals(sftpEndpoint.getConfiguration().getDirectoryName()))
            .findAny().orElse(null);

相关问题