使用javasdk将多个文件批处理到amazons3

m0rkklqb  于 2021-06-02  发布在  Hadoop
关注(0)|答案(1)|浏览(354)

我试图上传多个文件到amazons3都在同一个键下,通过附加文件。我有一个文件名列表,并想上传/附加文件的顺序。我几乎完全按照这个教程,但我循环通过每一个文件首先上传的一部分。因为文件在hdfs上(路径实际上是org.apache.hadoop.fs.path),所以我使用输入流来发送文件数据。下面是一些伪代码(我对教程中逐字逐句的代码块进行了注解):

// Create a list of UploadPartResponse objects. You get one of these for
// each part upload.
List<PartETag> partETags = new ArrayList<PartETag>();

// Step 1: Initialize.
InitiateMultipartUploadRequest initRequest = new InitiateMultipartUploadRequest(
        bk.getBucket(), bk.getKey());
InitiateMultipartUploadResult initResponse =
        s3Client.initiateMultipartUpload(initRequest);
try {
      int i = 1; // part number
      for (String file : files) {
        Path filePath = new Path(file);

        // Get the input stream and content length
        long contentLength = fss.get(branch).getFileStatus(filePath).getLen();
        InputStream is = fss.get(branch).open(filePath);

        long filePosition = 0;
        while (filePosition < contentLength) {
            // create request
            //upload part and add response to our list
            i++;
        }
    }
    // Step 3: Complete.
    CompleteMultipartUploadRequest compRequest = new
          CompleteMultipartUploadRequest(bk.getBucket(),
          bk.getKey(),
          initResponse.getUploadId(),
          partETags);

    s3Client.completeMultipartUpload(compRequest);
} catch (Exception e) {
      //...
}

但是,我得到以下错误:

com.amazonaws.services.s3.model.AmazonS3Exception: The XML you provided was not well-formed or did not validate against our published schema (Service: Amazon S3; Status Code: 400; Error Code: MalformedXML; Request ID: 2C1126E838F65BB9), S3 Extended Request ID: QmpybmrqepaNtTVxWRM1g2w/fYW+8DPrDwUEK1XeorNKtnUKbnJeVM6qmeNcrPwc
    at com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:1109)
    at com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:741)
    at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:461)
    at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:296)
    at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3743)
    at com.amazonaws.services.s3.AmazonS3Client.completeMultipartUpload(AmazonS3Client.java:2617)

如果有人知道这一错误的原因,我们将不胜感激。或者,如果有更好的方法将一堆文件连接到一个s3密钥中,那也会很好。我尝试使用java的内置sequenceinputstream,但没有成功。任何帮助都将不胜感激。作为参考,所有文件的总大小可高达10-15 gb。

5fjcxozz

5fjcxozz1#

我知道可能有点晚了,但值得我献计献策。我已经设法解决了一个类似的问题使用 SequenceInputStream .
诀窍在于能够计算结果文件的总大小,然后将 SequenceInputStream 带着一个 Enumeration<InputStream> .
下面是一些可能有用的示例代码:

public void combineFiles() {
    List<String> files = getFiles();
    long totalFileSize = files.stream()
                               .map(this::getContentLength)
                               .reduce(0L, (f, s) -> f + s);

    try {
        try (InputStream partialFile = new SequenceInputStream(getInputStreamEnumeration(files))) {
            ObjectMetadata resultFileMetadata = new ObjectMetadata();
            resultFileMetadata.setContentLength(totalFileSize);
            s3Client.putObject("bucketName", "resultFilePath", partialFile, resultFileMetadata);
        }
    } catch (IOException e) {
        LOG.error("An error occurred while combining files. {}", e);
    }
}

private Enumeration<? extends InputStream> getInputStreamEnumeration(List<String> files) {
    return new Enumeration<InputStream>() {
        private Iterator<String> fileNamesIterator = files.iterator();

        @Override
        public boolean hasMoreElements() {
            return fileNamesIterator.hasNext();
        }

        @Override
        public InputStream nextElement() {
            try {
                return new FileInputStream(Paths.get(fileNamesIterator.next()).toFile());
            } catch (FileNotFoundException e) {
                System.err.println(e.getMessage());
                throw new RuntimeException(e);
            }
        }

    };
}

希望这有帮助!

相关问题