我正在运行s3测试容器,并在我的机器上本地运行作业。我真的很抱歉,我将无法提供最低的可执行代码,因为代码库是相当巨大的,不能粘贴在这里更多的细节,因为合规问题。如果你能帮忙的话,我将不胜感激。此配置将在代码中由流式文件接收器连接器使用。
def startJob() = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val flinkConf = new Configuration()
flinkConf.set(ConfigOptions.key("s3.access-key").stringType.defaultValue("s3_mock_access_key"), "s3_mock_access_key")
flinkConf.set(ConfigOptions.key("s3.secret-key").stringType().defaultValue("s3_mock_secret_key"), "s3_mock_secret_key")
flinkConf.set(ConfigOptions.key("s3.endpoint").stringType().defaultValue(s"http://$getS3MockHost:$getS3MockPort"), s"http://$getS3MockHost:$getS3MockPort")
flinkConf.set(ConfigOptions.key("s3.path.style.access").defaultValue(true), true)
env.configure(flinkConf)
/**
* This createExecutionGraph function contains large number of operator and we are passing env variable to it
* for our local end to end testing
*/
jobclient = StreamingJob
.createExecutionGraph(env, Option(DYNAMIC_CONFIGURATION_FILE_NAME), None, true)
.executeAsync()
}
错误获取22:02:35,560 WARN org.apache.flink.runtime.taskmanager.Task - Sink:sinkoperator(1/1)#0(7026634 c61 d9 bbeba 29061 c72487 b 9a 4)从初始化切换到失败,失败原因:org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.AWSClientIOException:test-pdx 01上的doesBucketExist:org.apache.flink.fs.s3base.shaded.com.amazonaws.AmazonClientException:BasicAWSCredentialsProvider EnvironmentVariableCredentialsProvider InstanceProfileCredentialsProvider未提供AWS凭据:org.apache.flink.fs.s3base.shaded.com.amazonaws.SdkClientException:无法从服务终结点加载凭据:BasicAWSCredentialsProvider EnvironmentVariableCredentialsProvider InstanceProfileCredentialsProvider未提供AWS凭据:org.apache.flink.fs.s3base.shaded.com.amazonaws.SdkClientException:无法从服务端点加载凭据
1条答案
按热度按时间2lpgd9681#
您不能从作业代码中设置S3凭据,因为S3文件系统在Flink集群启动期间作为插件加载。您可以使用IAM或访问密钥,但在这两种情况下,都需要在
flink-conf.yaml
文件中进行设置。详情请访问https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/s3/#configure-access-credentials