我有一个flink 1.8.2应用程序运行在kinesis数据分析管理环境上。应用程序从kinesis数据流读取数据,并将聚合数据写入s3。我收到“拒绝访问”异常。已验证iam角色权限。似乎一切正常-它拥有s3 bucket和kms密钥的必要权限。
bucket策略强制使用putobject api上传的数据使用kms密钥加密。如何在flink应用程序中配置StreamingFileLink连接器以设置kms加密?
StreamingFileLink代码段
private static StreamingFileSink<String> createS3SinkFromStaticConfigTest() {
return StreamingFileSink
.forRowFormat(new Path(s3SinkPath), new SimpleStringEncoder<String>("UTF-8"))
.withRollingPolicy(
DefaultRollingPolicy.create()
.withRolloverInterval(TimeUnit.MINUTES.toMillis(1))
.withInactivityInterval(TimeUnit.MINUTES.toMillis(1))
.withMaxPartSize(1024 * 1024)
.build()
)
.build();
}
错误:
"locationInformation": "org.apache.flink.runtime.executiongraph.ExecutionGraph.transitionState(ExecutionGraph.java:1497)", "logger": "org.apache.flink.runtime.executiongraph.ExecutionGraph", "message": "Job Flink S3 Streaming Sink Job (f8901746927663ecb23b562ed4d85e37) switched from state RUNNING to FAILING.", "throwableInformation": [ "java.nio.file.AccessDeniedException: app/flink-data/2020-09-11--15/part-0-0: initiate MultiPartUpload on app/flink-data/2020-09-11--15/part-0-0: org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: Access Denied (Service: Amazon S3; Status Code: 403; Error Code: AccessDenied
暂无答案!
目前还没有任何答案,快来回答吧!