如何使用lambda+java从s3收集文件并发送到外部sftp?

wmomyfyw  于 2021-06-30  发布在  Java
关注(0)|答案(2)|浏览(552)

我每天都有文件放在s3 bucket中,我需要创建lambda函数将这个文件转发到外部sftp((使用java)
我的问题是,我不知道如何从lambda建立与s3的连接来收集文件(最终编辑它,例如重命名),然后转发到sftp。如果我们需要调用另一个lambda函数,是否可以像我们这样调用它?例子。或者我必须像在aws环境之外那样连接?
如果你有一些建议或者一些简单的实现例子,甚至接近这个,那就太好了!

kq4fsx7k

kq4fsx7k1#

首先,您需要配置s3 bucket以向lambda函数发送新的对象事件。
在lambda函数中,可以将s3对象路径从事件对象中拉出。然后您需要使用awsdkforjava将文件从s3下载到lambda函数的 /tmp 文件夹。然后让函数在 /tmp 文件夹。最后,使用sftp库从 /tmp sftp服务器的文件夹。

toiithl6

toiithl62#

下面是从s3获取文件并将其保存到sftp的完整示例。
我添加了以下库以使此java模块正常工作:
λ核
谷歌gson
aws s3软件开发包
sshj公司

import com.amazonaws.regions.Regions;
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.LambdaLogger;
import com.amazonaws.services.lambda.runtime.RequestStreamHandler;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.model.S3Object;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import net.schmizz.sshj.SSHClient;
import net.schmizz.sshj.sftp.SFTPClient;
import net.schmizz.sshj.xfer.FileSystemFile;

import java.io.*;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.StandardCopyOption;

public class S3ToSFTPTest implements RequestStreamHandler {

    private LambdaLogger logger;

    @Override
    public void handleRequest(InputStream input, OutputStream output, Context context) throws IOException {
        logger = context.getLogger();

        AmazonS3 s3Client = AmazonS3ClientBuilder.standard().withRegion(Regions.US_EAST_1).build();

        // s3 client
        if (s3Client == null) {
            logger.log("S3 Client is null - can't continue!");
            sendResponse(context, new S3ToSFTPTestResponse(false), output);
            return;
        }

        String bucketName = "==== S3 BUCKET NAME ====";

        // s3 bucket - make sure it exist
        if (!s3Client.doesBucketExistV2(bucketName)) {
            logger.log("S3 Bucket does not exists - can't continue!");
            sendResponse(context, new S3ToSFTPTestResponse(false), output);
            return;
        }

        String fileName = "==== S3 FILE NAME ====";
        File localFile = null;

        try {
            localFile = File.createTempFile(fileName, "");

            // get S3Object
            S3Object s3Object = s3Client.getObject(bucketName, fileName);

            // get stream from S3Object
            InputStream inputStream = s3Object.getObjectContent();

            // write S3Object stream into a temp file
            Files.copy(inputStream, localFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
        } catch (Exception e) {
            logger.log("Failed to get file from S3: " + e.toString());
        }

        if(localFile == null) {
            sendResponse(context, new S3ToSFTPTestResponse(false), output);
            return;
        }

        // now, you have the file stored locally
        // modify it as you need to
        // .....

        // finally, send the file to SFTP
        boolean fileSaved = saveFilesToSFTP(context, localFile);
        logger.log("fileSaved: " + fileSaved);

        sendResponse(context, new S3ToSFTPTestResponse(true), output);
    }

    public boolean saveFilesToSFTP(Context context, File... files) {
        // this is for test only - In real application, I would suggest that
        // do NOT store these information in the code.
        // You should use service like Secrets Manager or Parameter Store
        final String sftpHostname = "==== SFTP Hostname ====";
        final String sftpUsername = "==== SFTP Username ====";
        final String sftpPassword = "==== SFTP Password ====";

        String remoteFolderPath = "/root/S3Files/";

        try {
            SSHClient ssh = new SSHClient();
            ssh.addHostKeyVerifier((hostname1, port, key) -> true);

            ssh.connect(sftpHostname);
            logger.log("SSHClient Connected!");

            try {
                ssh.authPassword(sftpUsername, sftpPassword);
                logger.log("SSHClient Authenticated!");

                try (SFTPClient sftp = ssh.newSFTPClient()) {
                    for(File file : files) {
                        sftp.put(new FileSystemFile(file), remoteFolderPath);
                    }
                } catch (Exception e) {
                    logger.log("failed to get SFTPClient: " + e.toString());
                    return false;
                }
            } finally {
                ssh.disconnect();
            }
        } catch (Exception e) {
            logger.log("SFTP connection failed: " + e.toString());
            return false;
        }

        return true;
    }

    public void sendResponse(Context context, S3ToSFTPTestResponse response, OutputStream output) {
        Gson gson = new GsonBuilder().setPrettyPrinting().create();
        String responseStr = gson.toJson(response);

        logger.log("response:\n" + responseStr);

        try {
            OutputStreamWriter writer = new OutputStreamWriter(output, StandardCharsets.UTF_8);
            writer.write(responseStr);
            writer.close();
        } catch (Exception e) {
            logger.log("failed to send response: " + e.toString());
        }
    }

    public static class S3ToSFTPTestResponse implements Serializable {
        private boolean success;

        public S3ToSFTPTestResponse() {
        }

        public S3ToSFTPTestResponse(boolean success) {
            this.success = success;
        }

        public boolean isSuccess() {
            return success;
        }
    }
}

相关问题