flink avro rollingsinkParquet写手

3phpmpom  于 2021-06-25  发布在  Flink
关注(0)|答案(1)|浏览(288)

当我试图在rollingsink中设置avroparquetwriter时出现问题,sink路径和writer路径似乎冲突
flink版本:1.1.3
Parquetavro版本:1.8.1
错误:

[...]
12/14/2016 11:19:34 Source: Custom Source -> Sink: Unnamed(8/8) switched to CANCELED
INFO  JobManager - Status of job af0880ede809e0d699eb69eb385ca204 (Flink Streaming Job) changed to FAILED.
java.lang.RuntimeException: Could not forward element to next operator
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:376)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:358)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:346)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:329)
    at org.apache.flink.streaming.api.operators.StreamSource$NonTimestampContext.collect(StreamSource.java:161)
    at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecord(AbstractFetcher.java:225)
    at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.run(Kafka09Fetcher.java:253)
    at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.hadoop.fs.FileAlreadyExistsException: File already exists: /home/user/data/file
    at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:264)
    at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:257)
    at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSOutputSummer.<init>(ChecksumFileSystem.java:386)
    at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:447)
    at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:426)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:906)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:887)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:784)
    at org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:223)
    at org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:266)
    at org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:217)
    at org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:183)
    at org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:153)
    at org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:119)
    at org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:92)
    at org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:66)
    at org.apache.parquet.avro.AvroParquetWriter.<init>(AvroParquetWriter.java:54)
    at fr.test.SpecificParquetWriter.open(SpecificParquetWriter.java:28) // line in code => writer = new AvroParquetWriter(new Path("/home/user/data/file"), schema, compressionCodecName, blockSize, pageSize);
    at org.apache.flink.streaming.connectors.fs.RollingSink.openNewPartFile(RollingSink.java:451)
    at org.apache.flink.streaming.connectors.fs.RollingSink.invoke(RollingSink.java:371)
    at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:39)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:373)
    ... 7 more
INFO  JobClientActor - 12/14/2016 11:19:34  Job execution switched to status FAILED.
12/14/2016 11:19:34 Job execution switched to status FAILED.
INFO  JobClientActor - Terminate JobClientActor.
[...]

主要内容:

RollingSink sink = new RollingSink<String>("/home/user/data");
sink.setBucketer(new DateTimeBucketer("yyyy/MM/dd"));
sink.setWriter(new SpecificParquetWriter());
stream.addSink(sink);

具体Parquet师:

public class SpecificParquetWriter<V> extends StreamWriterBase<V> {

    private transient AvroParquetWriter writer;

    private CompressionCodecName compressionCodecName = CompressionCodecName.SNAPPY;
    private int blockSize = ParquetWriter.DEFAULT_BLOCK_SIZE;
    private int pageSize = ParquetWriter.DEFAULT_PAGE_SIZE;

    public static final String USER_SCHEMA = "{"
            + "\"type\":\"record\","
            + "\"name\":\"myrecord\","
            + "\"fields\":["
            + "  { \"name\":\"str1\", \"type\":\"string\" },"
            + "  { \"name\":\"str2\", \"type\":\"string\" },"
            + "  { \"name\":\"int1\", \"type\":\"int\" }"
            + "]}";

    public SpecificParquetWriter(){

    }

    @Override
    // workaround
    public void open(FileSystem fs, Path path) throws IOException {
        super.open(fs, path);
        Schema schema = new Schema.Parser().parse(USER_SCHEMA);

        writer = new AvroParquetWriter(new Path("/home/user/data/file"), schema, compressionCodecName, blockSize, pageSize);
    }

    @Override
    public void write(Object element) throws IOException {
        if(writer != null)
            writer.write(element);
    }

    @Override
    public Writer duplicate() {
        return new SpecificParquetWriter();
    }
}

我不知道我做的对不对。。。
有没有简单的方法?

a1o7rhls

a1o7rhls1#

在rollingsink中是writer的基类或在bucketing sink中是streambasewriter的基类存在这个问题,因为它们只接受可以处理outputstream的writer,而不保存自己的writer。 writer= new AvroKeyValueWriter<K, V>(keySchema, valueSchema, compressionCodec, streamObject); 鉴于avroparquetwriter或parquetwriter接受filepath writer = AvroParquetWriter.<V>builder(new Path("filePath")) .withCompressionCodec(CompressionCodecName.SNAPPY) .withSchema(schema).build(); 我深入了解了parquetwriter,意识到我们正在尝试的东西没有意义,因为像storm这样的事件处理系统flink不能向parquet写入一条记录,而spark streaming可以,因为它是基于microbatch原理工作的。
使用风暴与三叉戟,我们仍然可以写Parquet文件,但与Flink,我们不能,直到Flink推出类似的微博客。
因此,对于这种类型的用例,spark流是一个更好的选择。
如果想使用flink,也可以进行批处理。

相关问题