如何使用avroparquetwriter并通过amazons3api写入s3?

koaltpgm  于 2021-05-27  发布在  Hadoop
关注(0)|答案(2)|浏览(746)

我目前正在使用下面的代码编写通过avroParquet。这段代码将其写入文件系统,但我想写入s3。

try {
    StopWatch sw = StopWatch.createStarted();
    Schema avroSchema = AvroSchemaBuilder.build("pojo", message.getTransformedMessage().get(0));
    final String parquetFile = "parquet/data.parquet";
    final Path path = new Path(parquetFile);

    ParquetWriter writer = AvroParquetWriter.<GenericData.Record>builder(path)
        .withSchema(avroSchema)
        .withConf(new org.apache.hadoop.conf.Configuration())
        .withCompressionCodec(CompressionCodecName.SNAPPY)
        .withWriteMode(Mode.OVERWRITE)//probably not good for prod. (overwrites files).
        .build();

    for (Map<String, Object> row : message.getTransformedMessage()) {
      StopWatch stopWatch = StopWatch.createStarted();
      final GenericRecord record = new GenericData.Record(avroSchema);
      row.forEach((k, v) -> {
        record.put(k, v);
      });
      writer.write(record);
    }
    //todo:  Write to S3.  We should probably write via the AWS objects.  This does not show that.
    //https://stackoverflow.com/questions/47355038/how-to-generate-parquet-file-using-pure-java-including-date-decimal-types-an
    writer.close();
    System.out.println("Total Time: " + sw);

  } catch (Exception e) {
    //do somethign here.  retryable?  non-retryable?  Wrap this excetion in one of these?
    transformedParquetMessage.getOriginalMessage().getMetaData().addException(e);
  }

这样可以很好地写入文件,但是如何将其流式传输到amazons3api中呢?我在网上发现了一些使用hadoopawsjar的代码,但是这需要一些windowsexe文件才能工作,当然,我们希望避免这种情况。目前我只使用:

<dependency>
  <groupId>org.apache.avro</groupId>
  <artifactId>avro</artifactId>
  <version>1.9.2</version>
</dependency>
<dependency>
  <groupId>org.apache.parquet</groupId>
  <artifactId>parquet-avro</artifactId>
  <version>1.8.1</version>
</dependency>
<dependency>
  <groupId>org.apache.hadoop</groupId>
  <artifactId>hadoop-core</artifactId>
  <version>1.2.1</version>
</dependency>

所以问题是,有没有一种方法可以截取avroparquetwriter上的输出流,这样我就可以将它流到s3?我想这样做的主要原因是为了重试。s3自动重试最多3次。这对我们有很大帮助。

jexiocij

jexiocij1#

这取决于hadoopawsjar,所以如果你不愿意使用它,我不确定我能帮到你。不过,我是在mac上运行的,没有任何WindowsEXE文件,所以我不知道你说的这些文件是从哪里来的。avroparquetwriter已经依赖于hadoop,因此即使您无法接受这种额外的依赖性,对其他人来说也不是什么大问题:
您可以使用avroparquetwriter直接流式传输到s3,方法是向它传递使用uri参数创建的hadoop路径并设置适当的配置。

val uri = new URI("s3a://<bucket>/<key>")
val path = new Path(uri)

val config = new Configuration()
config.set("fs.s3a.access.key", key)
config.set("fs.s3a.secret.key", secret)
config.set("fs.s3a.session.token", sessionToken)
config.set("fs.s3a.aws.credentials.provider", credentialsProvider)

val writer = AvroParquetWriter.builder[GenericRecord](path).withConf(config).withSchema(schema).build()

我使用了以下依赖项(sbt格式):

"org.apache.avro" % "avro" % "1.8.1"
"org.apache.hadoop" % "hadoop-common" % "2.9.0"
"org.apache.hadoop" % "hadoop-aws" % "2.9.0"
"org.apache.parquet" % "parquet-avro" % "1.8.1"
a5g8bdjr

a5g8bdjr2#

希望我没有误解这个问题,但似乎在这里你正在做的是转换一个avro到Parquet,你想上传Parquet到s3
关闭parquetwriter后,应该调用如下所示的方法(假定该方法不会拦截从avro到parquet的流写入,它只是流式处理不再写入的parquet文件):

AmazonS3 s3Client = AmazonS3ClientBuilder.standard().withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials("ACCESS_KEY", "SECRET_KEY"))).build();
        S3Path outputPath = new S3Path();
        outputPath.setBucket("YOUR_BUCKET");
        outputPath.setKey("YOUR_FOLDER_PATH");
        try {
            InputStream parquetStream = new FileInputStream(new File(parquetFile));
            s3Client.putObject(outputPath.getBucket(), outputPath.getKey(), parquetStream, null);
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        }

使用aws sdk

<dependency>
    <groupId>com.amazonaws</groupId>
    <artifactId>aws-java-sdk</artifactId>
    <version>1.11.749</version>
</dependency>

当然,该方法将驻留在不同的utils类中,并且该方法的构造函数应该使用凭据初始化amazons3s3client,因此您所需要做的就是调用并访问它的s3client成员来放置对象
希望这有帮助

相关问题