当我将apache flink流作为接收器运行到awss3时,标准版本(forrowformat)运行得很好。
StreamingFileSink<String> s3sink = StreamingFileSink
.forRowFormat(new Path(s3Url),
(String element, OutputStream stream) -> {
PrintStream out = new PrintStream(stream);
out.println(element);
})
.withBucketAssigner(new BucketAssigner())
.withRollingPolicy(DefaultRollingPolicy.builder()
.withMaxPartSize(100)
.withRolloverInterval(30000)
.build())
.withBucketCheckInterval(100)
.build();
当我用bulk format和compresswriterfactory运行同样的东西时
StreamingFileSink<String> s3sink = StreamingFileSink
.forBulkFormat(new Path(s3Url),
new CompressWriterFactory(new DefaultExtractor()))
.withOutputFileConfig(outputFileConfig)
.build();
它给了我以下的错误。
(注-compresswriterfactory在hdfs方案下运行良好'hdfs://host:端口/路径')
java.lang.UnsupportedOperationException: S3RecoverableFsDataOutputStream cannot sync state to S3. Use persist() to create a persistent recoverable intermediate point.
at org.apache.flink.fs.s3.common.utils.RefCountedBufferingFileStream.sync(RefCountedBufferingFileStream.java:112)
at org.apache.flink.fs.s3.common.writer.S3RecoverableFsDataOutputStream.sync(S3RecoverableFsDataOutputStream.java:126)
at org.apache.flink.formats.compress.writers.NoCompressionBulkWriter.finish(NoCompressionBulkWriter.java:56)
at org.apache.flink.streaming.api.functions.sink.filesystem.BulkPartWriter.closeForCommit(BulkPartWriter.java:62)
at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.closePartFile(Bucket.java:239)
at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.prepareBucketForCheckpointing(Bucket.java:280)
at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.onReceptionOfCheckpoint(Bucket.java:253)
at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.snapshotActiveBuckets(Buckets.java:250)
at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.snapshotState(Buckets.java:241)
at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.snapshotState(StreamingFileSink.java:422)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
...
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
注意事项-
flink版本1.10.0
s3url=“s3a://bucket/folder/path”;
2条答案
按热度按时间gojuced71#
这好像是个虫子。你可以打开一个带有描述的jira。
qfe3c7zg2#
刚刚发现了解决此问题的方法:指定hadoop压缩编解码器: