如何在Java中处理Apache Mina SSHD SFTP服务器中的传入文件

pxiryf3j  于 2022-10-15  发布在  Java
关注(0)|答案(4)|浏览(293)

目前我正在研究SFTP协议。我已经使用Jsch库创建了SFTP客户端,并使用Apache Mina Sshd库创建了SFTP服务器。我已经在它们之间建立了连接,可以成功地将文件发送到SFTP服务器。现在我正在创建一个SFTP服务器端文件处理程序,用于处理传入的文件。例如,SFTP服务器可以从SFTP客户端接收文件,但目前在我的当文件到达服务器时,无法通知。我只需转到服务器根文件夹,看看是否有可用的文件。这就是我如何知道文件是否到达的方法。
我想实现的是,当文件到达服务器时,它会通知用户文件到达和文件内容。(文件名和其他详细信息)。但问题是我是Apache Mina sshd API的新手。我已经阅读了文档,但我无法理解。
请告诉我,如果Apache Mina Sshd服务器中有任何已实现的侦听器用于处理传入文件,或者如果没有,我如何实现自己的侦听器来处理传入文件。

SFTP服务器代码

public class SftpServerStarter {

    private SshServer sshd;
    private final static Logger logger = LoggerFactory.getLogger(SftpServerStarter.class);

    public void start(){

        sshd = SshServer.setUpDefaultServer();
        sshd.setPort(22);
        sshd.setHost("localhost");

        sshd.setPasswordAuthenticator(new MyPasswordAuthenticator());
        sshd.setPublickeyAuthenticator(new MyPublickeyAuthenticator());
        sshd.setKeyPairProvider(new SimpleGeneratorHostKeyProvider());
        sshd.setSubsystemFactories(Arrays.<NamedFactory<Command>>asList(new SftpSubsystem.Factory()));
        sshd.setCommandFactory(new ScpCommandFactory());
        sshd.setFileSystemFactory(new VirtualFileSystemFactory("C:/root"));

        try {
            logger.info("Starting ...");
            sshd.start();
            logger.info("Started");
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
            logger.info("Can not Start Server");
        }
    }

}
pbgvytdp

pbgvytdp1#

我开始使用@gihan的建议,但在用户将文件上载到某些客户端之前,我在文件监视器处理文件时遇到了一些问题。
这是我在米娜的源代码中找到的解决方案。尽管Apache Mina网站上的文档稀少、无用,但我认为这是他们打算让开发人员使用其库的方式。

注意:****由于您的需求可能与我的不同,请记住,这可能不是一种复制粘贴解决方案您可能需要调整此代码以满足您的需求*,但我非常有信心,此代码确实为您所寻找的解决方案提供了关键*
步骤1:实现SftpEventListener

创建自己的类来实现org.apache.sshd.server.subsystem.sftp.SftpEventListener。下面是我的示例**。我的实现被设置为每当新上载或覆盖文件时运行一系列注册的FileUploadCompleteListener方法,并阻止用户尝试导航或创建目录。

public class SFTPServiceSFTPEventListener implements SftpEventListener {

    Logger logger = Logger.getLogger(SFTPServiceSFTPEventListener.class);

    SFTPService service;

    public SFTPServiceSFTPEventListener(SFTPService service) {
        this.service = service;
    }

    public interface FileUploadCompleteListener {
        void onFileReady(File file);
    }

    private List<FileUploadCompleteListener> fileReadyListeners = new ArrayList<FileUploadCompleteListener>();

    public void addFileUploadCompleteListener(FileUploadCompleteListener listener) {
        fileReadyListeners.add(listener);
    }

    public void removeFileUploadCompleteListener(FileUploadCompleteListener listener) {
        fileReadyListeners.remove(listener);
    }

    @Override
    public void initialized(ServerSession serverSession, int version) {

    }

    @Override
    public void destroying(ServerSession serverSession) {

    }

    @Override
    public void open(ServerSession serverSession, String remoteHandle, Handle localHandle) {
        File openedFile = localHandle.getFile().toFile();
        if (openedFile.exists() && openedFile.isFile()) {
        }
    }

    @Override
    public void read(ServerSession serverSession, String remoteHandle, DirectoryHandle localHandle, Map<String,Path> entries) {

    }

    @Override
    public void read(ServerSession serverSession, String remoteHandle, FileHandle localHandle, long offset, byte[] data, int dataOffset, int dataLen, int readLen) {

    }

    @Override
    public void write(ServerSession serverSession, String remoteHandle, FileHandle localHandle, long offset, byte[] data, int dataOffset, int dataLen) {
    }

    @Override
    public void blocking(ServerSession serverSession,  String remoteHandle, FileHandle localHandle, long offset, long length, int mask) {
    }

    @Override
    public void blocked(ServerSession serverSession, String remoteHandle, FileHandle localHandle, long offset, long length, int mask, Throwable thrown) {
    }

    @Override
    public void unblocking(ServerSession serverSession, String remoteHandle, FileHandle localHandle, long offset, long length) {
    }

    @Override
    public void unblocked(ServerSession serverSession, String remoteHandle, FileHandle localHandle, long offset, long length, Boolean result, Throwable thrown) {
    }

    @Override
    public void close(ServerSession serverSession, String remoteHandle, Handle localHandle) {
        File closedFile = localHandle.getFile().toFile();
        if (closedFile.exists() && closedFile.isFile()) {
            logger.info(String.format("User %s closed file: \"%s\"", serverSession.getUsername(), localHandle.getFile().toAbsolutePath()));
            this.service.UserWroteFile(serverSession.getUsername(), localHandle.getFile());

            for (FileUploadCompleteListener fileReadyListener : fileReadyListeners) {
                fileReadyListener.onFileReady(closedFile);
            }
        }
    }

    @Override
    public void creating(ServerSession serverSession, Path path, Map<String,?> attrs) throws UnsupportedOperationException {
        logger.warn(String.format("Blocked user %s attempt to create a directory \"%s\"", serverSession.getUsername(), path.toString()));
        throw new UnsupportedOperationException("Creating sub-directories is not permitted.");
    }

    @Override
    public void created(ServerSession serverSession, Path path, Map<String,?> attrs, Throwable thrown) {
        String username = serverSession.getUsername();
        logger.info(String.format("User %s created: \"%s\"", username, path.toString()));
        service.UserWroteFile(username, path);
    }

    @Override
    public void moving(ServerSession serverSession, Path path, Path path1, Collection<CopyOption> collection) {

    }

    @Override
    public void moved(ServerSession serverSession, Path source, Path destination, Collection<CopyOption> collection, Throwable throwable) {
        String username = serverSession.getUsername();
        logger.info(String.format("User %s moved: \"%s\" to \"%s\"", username, source.toString(), destination.toString()));
        service.UserWroteFile(username, destination);
    }

    @Override
    public void removing(ServerSession serverSession, Path path) {

    }

    @Override
    public void removed(ServerSession serverSession, Path path, Throwable thrown) {

    }

    @Override
    public void linking(ServerSession serverSession, Path source, Path target, boolean symLink) throws UnsupportedOperationException {
        logger.warn(String.format("Blocked user %s attempt to create a link to \"%s\" at \"%s\"", serverSession.getUsername(), target.toString(), source.toString()));
        throw new UnsupportedOperationException("Creating links is not permitted");
    }

    @Override
    public void linked(ServerSession serverSession, Path source, Path target, boolean symLink, Throwable thrown) {

    }

    @Override
    public void modifyingAttributes(ServerSession serverSession, Path path, Map<String,?> attrs) {

    }

    @Override
    public void modifiedAttributes(ServerSession serverSession, Path path, Map<String,?> attrs, Throwable thrown) {
        String username = serverSession.getUsername();
        service.UserWroteFile(username, path);
    }
}

步骤2:将侦听器的示例添加到服务器

实现类后,只需示例化它并使用SftpSubsystemFactory将其添加到服务器,然后在服务器上调用start()

// Your SSHD Server
SshServer sshd = SshServer.setUpDefaultServer();

SftpSubsystemFactory sftpSubsystemFactory= new SftpSubsystemFactory();

// This is where to put your implementation of SftpEventListener
SFTPServiceSFTPEventListener sftpEventListener = new SFTPServiceSFTPEventListener(this);
sftpEventListener.addFileUploadCompleteListener(new SFTPServiceSFTPEventListener.FileUploadCompleteListener() {
    @Override
    public void onFileReady(File file) {
        try {
            doThingsWithFile(file);
        } catch (Exception e) {
            logger.warn(String.format("An error occurred while attempting to do things with the file: \"%s\"", file.getName()), e);
        }
    }
});
sftpSubsystemFactory.addSftpEventListener(sftpEventListener);

List<NamedFactory<Command>> namedFactoryList = new ArrayList<NamedFactory<Command>>();
namedFactoryList.add(sftpSubsystemFactory);
sshd.setSubsystemFactories(namedFactoryList);

// Do your other init stuff...

sshd.start();

完成此操作后,SftpEventListener的实现将开始自动响应已实现的事件。我的基本上只响应用户何时关闭文件(当文件上载完成时发生),但正如我所说,您可以自由地实现其他方法来响应其他事件

pvcm50d1

pvcm50d12#

我终于找到了一个解决方案,但它不是来自Apache Mina SSHD API。这是一个概念:我们可以监视服务器的根目录中的文件更改。如果服务器文件夹中有更改的文件,则会触发事件。有很多API可以做到这一点。在我的代码片段中,我使用的是org.apache.commons.io.monitor

SFTPFileListner类

public static void startMonitor(String rootFolder) throws Exception {

        //every 5 seconds it will start monitoring
        final long pollingInterval = 5 * 1000;

        File folder = new File(rootFolder);

        if (!folder.exists()) {

            throw new RuntimeException("ERROR : Server root directory not found: " + rootFolder);
        }

        FileAlterationObserver observer = new FileAlterationObserver(folder);
        FileAlterationMonitor monitor = new FileAlterationMonitor(pollingInterval);
        FileAlterationListener listener = new FileAlterationListenerAdaptor() {

            @Override
            public void onFileCreate(File file) {
                try {

                    System.out.println("[SFTPFileListner] Received :"+ file.getName());
                    System.out.println("[SFTPFileListner] Received File Path :"+ file.getCanonicalPath());

                } catch (IOException e) {
                    throw new RuntimeException("ERROR: Unrecoverable error when creating files " + e.getMessage(),e);
                }
            }

        };

        observer.addListener(listener);
        monitor.addObserver(observer);
        monitor.start();
    }

创建监控器类后,可以在SFTP服务器类中调用实现的方法。

SFTP服务器类

//pass server root directory 
SFTPFileListner.startMonitor("C:/root");
cfh9epnr

cfh9epnr3#

正如我在另一篇帖子中提到的,Mina没有直接为我们提供在接收或部分接收传入文件时处理触发器的能力,但我们的需求非常具体。所以我们唯一的选择是走出Mina,上面的解决方案就是这么做的。这可能值得作为Mina的拉请求/特性来推动,或者作为开源补充解决方案进一步开发。我认为,当服务器中出现问题时,人们会面临一个主动通知/触发系统的常见问题。祝你的发展顺利!

vngu2lb8

vngu2lb84#

请问您将如何实现我的Publickeyauthenticator?

相关问题