使用aws java sdk v2获取s3select的部分json响应

yc0p9oo0  于 2023-02-20  发布在  Java
关注(0)|答案(1)|浏览(167)

我试图在一个Spring Boot 应用程序中实现s3 select来查询s3桶中的 parquet 文件,我只从s3 select输出中获得部分结果,请帮助识别问题,我使用了aws java sdk v2。
检查json输出(打印在控制台中)后,输出中的总字符数为65 k。
我正在使用Eclipse,并尝试取消选中控制台首选项中的“限制控制台输出”,但没有帮助。
代码在这里:-

import java.util.List;
import java.util.concurrent.CompletableFuture;

import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.core.async.SdkPublisher;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.model.CompressionType;
import software.amazon.awssdk.services.s3.model.EndEvent;
import software.amazon.awssdk.services.s3.model.ExpressionType;
import software.amazon.awssdk.services.s3.model.InputSerialization;
import software.amazon.awssdk.services.s3.model.JSONOutput;
import software.amazon.awssdk.services.s3.model.OutputSerialization;
import software.amazon.awssdk.services.s3.model.ParquetInput;
import software.amazon.awssdk.services.s3.model.RecordsEvent;
import software.amazon.awssdk.services.s3.model.SelectObjectContentEventStream;
import software.amazon.awssdk.services.s3.model.SelectObjectContentEventStream.EventType;
import software.amazon.awssdk.services.s3.model.SelectObjectContentRequest;
import software.amazon.awssdk.services.s3.model.SelectObjectContentResponse;
import software.amazon.awssdk.services.s3.model.SelectObjectContentResponseHandler;

public class ParquetSelect {
    
    private static final String BUCKET_NAME = "<bucket-name>";
    private static final String KEY = "<object-key>";
    private static final String QUERY = "select * from S3Object s";
    public static S3AsyncClient s3;
    

    public static void selectObjectContent() {
        Handler handler = new Handler();
        
        SelectQueryWithHandler(handler).join();

        RecordsEvent recordsEvent = (RecordsEvent) handler.receivedEvents.stream()
                                                                         .filter(e -> e.sdkEventType() == EventType.RECORDS)
                                                                         .findFirst()
                                                                         .orElse(null);

        System.out.println(recordsEvent.payload().asUtf8String());

    }

    private static CompletableFuture<Void> SelectQueryWithHandler(SelectObjectContentResponseHandler handler) {
        InputSerialization inputSerialization = InputSerialization.builder()
                                                                  .parquet(ParquetInput.builder().build())
                                                                  .compressionType(CompressionType.NONE)
                                                                  .build();

        OutputSerialization outputSerialization = OutputSerialization.builder()
                                                                     .json(JSONOutput.builder().build())
                                                                     .build();

        SelectObjectContentRequest select = SelectObjectContentRequest.builder()
                                                                      .bucket(BUCKET_NAME)
                                                                      .key(KEY)
                                                                      .expression(QUERY)
                                                                      .expressionType(ExpressionType.SQL)
                                                                      .inputSerialization(inputSerialization)
                                                                      .outputSerialization(outputSerialization)
                                                                      .build();

        return s3.selectObjectContent(select, handler);
    }

    private static class Handler implements SelectObjectContentResponseHandler {
        private SelectObjectContentResponse response;
        private List<SelectObjectContentEventStream> receivedEvents = new ArrayList<>();
        private Throwable exception;

        @Override
        public void responseReceived(SelectObjectContentResponse response) {
            this.response = response;
        }

        @Override
        public void onEventStream(SdkPublisher<SelectObjectContentEventStream> publisher) {
            publisher.subscribe(receivedEvents::add);
        }

        @Override
        public void exceptionOccurred(Throwable throwable) {
            exception = throwable;
        }

        @Override
        public void complete() {
        }
    }
    
}
nle07wnf

nle07wnf1#

我看到您正在使用selectObjectContent()。您是否尝试过调用**s3AsyncClient.getObject()**方法。这是否适合您?
例如,下面的代码示例从Amazon S3存储桶获取PDF文件,并将该PDF文件写入本地文件。

package com.example.s3.async;
// snippet-start:[s3.java2.async_stream_ops.complete]

// snippet-start:[s3.java2.async_stream_ops.import]
import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider;
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
import java.nio.file.Paths;
import java.util.concurrent.CompletableFuture;
// snippet-end:[s3.java2.async_stream_ops.import]

// snippet-start:[s3.java2.async_stream_ops.main]

/**
 * Before running this Java V2 code example, set up your development environment, including your credentials.
 *
 * For more information, see the following documentation topic:
 *
 * https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/get-started.html
 */

public class S3AsyncStreamOps {

    public static void main(String[] args) {

        final String usage = "\n" +
                "Usage:\n" +
                "    <bucketName> <objectKey> <path>\n\n" +
                "Where:\n" +
                "    bucketName - The name of the Amazon S3 bucket (for example, bucket1). \n\n" +
                "    objectKey - The name of the object (for example, book.pdf). \n" +
                "    path - The local path to the file (for example, C:/AWS/book.pdf). \n" ;

        if (args.length != 3) {
            System.out.println(usage);
            System.exit(1);
         }

        String bucketName = args[0];
        String objectKey = args[1];
        String path = args[2];
        ProfileCredentialsProvider credentialsProvider = ProfileCredentialsProvider.create();
        Region region = Region.US_EAST_1;
        S3AsyncClient s3AsyncClient = S3AsyncClient.builder()
                .region(region)
                .credentialsProvider(credentialsProvider)
                .build();

        GetObjectRequest objectRequest = GetObjectRequest.builder()
                .bucket(bucketName)
                .key(objectKey)
                .build();

        CompletableFuture<GetObjectResponse> futureGet = s3AsyncClient.getObject(objectRequest,
                AsyncResponseTransformer.toFile(Paths.get(path)));

        futureGet.whenComplete((resp, err) -> {
            try {
                if (resp != null) {
                    System.out.println("Object downloaded. Details: "+resp);
                } else {
                    err.printStackTrace();
                }
            } finally {
               // Only close the client when you are completely done with it.
                s3AsyncClient.close();
            }
        });
        futureGet.join();
    }
}

相关问题