我目前正在使用下面的代码编写通过avroParquet。这段代码将其写入文件系统,但我想写入s3。
try {
StopWatch sw = StopWatch.createStarted();
Schema avroSchema = AvroSchemaBuilder.build("pojo", message.getTransformedMessage().get(0));
final String parquetFile = "parquet/data.parquet";
final Path path = new Path(parquetFile);
ParquetWriter writer = AvroParquetWriter.<GenericData.Record>builder(path)
.withSchema(avroSchema)
.withConf(new org.apache.hadoop.conf.Configuration())
.withCompressionCodec(CompressionCodecName.SNAPPY)
.withWriteMode(Mode.OVERWRITE)//probably not good for prod. (overwrites files).
.build();
for (Map<String, Object> row : message.getTransformedMessage()) {
StopWatch stopWatch = StopWatch.createStarted();
final GenericRecord record = new GenericData.Record(avroSchema);
row.forEach((k, v) -> {
record.put(k, v);
});
writer.write(record);
}
//todo: Write to S3. We should probably write via the AWS objects. This does not show that.
//https://stackoverflow.com/questions/47355038/how-to-generate-parquet-file-using-pure-java-including-date-decimal-types-an
writer.close();
System.out.println("Total Time: " + sw);
} catch (Exception e) {
//do somethign here. retryable? non-retryable? Wrap this excetion in one of these?
transformedParquetMessage.getOriginalMessage().getMetaData().addException(e);
}
这样可以很好地写入文件,但是如何将其流式传输到amazons3api中呢?我在网上发现了一些使用hadoopawsjar的代码,但是这需要一些windowsexe文件才能工作,当然,我们希望避免这种情况。目前我只使用:
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.9.2</version>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
<version>1.8.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>1.2.1</version>
</dependency>
所以问题是,有没有一种方法可以截取avroparquetwriter上的输出流,这样我就可以将它流到s3?我想这样做的主要原因是为了重试。s3自动重试最多3次。这对我们有很大帮助。
2条答案
按热度按时间jexiocij1#
这取决于hadoopawsjar,所以如果你不愿意使用它,我不确定我能帮到你。不过,我是在mac上运行的,没有任何WindowsEXE文件,所以我不知道你说的这些文件是从哪里来的。avroparquetwriter已经依赖于hadoop,因此即使您无法接受这种额外的依赖性,对其他人来说也不是什么大问题:
您可以使用avroparquetwriter直接流式传输到s3,方法是向它传递使用uri参数创建的hadoop路径并设置适当的配置。
我使用了以下依赖项(sbt格式):
a5g8bdjr2#
希望我没有误解这个问题,但似乎在这里你正在做的是转换一个avro到Parquet,你想上传Parquet到s3
关闭parquetwriter后,应该调用如下所示的方法(假定该方法不会拦截从avro到parquet的流写入,它只是流式处理不再写入的parquet文件):
使用aws sdk
当然,该方法将驻留在不同的utils类中,并且该方法的构造函数应该使用凭据初始化amazons3s3client,因此您所需要做的就是调用并访问它的s3client成员来放置对象
希望这有帮助