我正在尝试将我的flink作业(在emr上运行的v1.8)从使用bucketingsink过渡到更新的streamingfilesink。
我已经运行了新代码,几乎所有的东西看起来都很好。文件被写入s3并转换为完成。唯一的问题是,s3的acl设置与旧代码不同。
我有我的 core-site.xml
像这样设置
<configuration>
<property>
<name>fs.s3a.acl.default</name>
<value>BucketOwnerFullControl</value>
</property>
</configuration>
我也在用 s3a://
作为中路径的前缀 forRowFormat()
StreamingFileLink生成器的参数。
另外,当切换到streamingfilesink时,我必须向build.gradle添加一个新的依赖项
flinkShadowJar "org.apache.flink:flink-s3-fs-hadoop:${flinkVersion}"
当我使用bucketingsinkapi时,我不太清楚我是如何在没有这个jar的情况下使用s3a://前缀来编写s3的。不知何故,我现在正在以一种不尊重core-site.xml设置的方式编写s3。
1条答案
按热度按时间gopyfrb31#
经过反复试验,我发现在我的
flink-conf.yml
解决了这个问题。