问题摘要
- 我正在尝试为我的flink应用程序运行本地集成测试,它从AWS s3读取parquet文件,进行一些转换,并将输出写回s3;我使用以下代码:
- Flink的Parquet格式源码:https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/datastream/formats/parquet/
- localstack/testcontainers在localhost上启动本地s3
- junit 5运行我的测试
- Flink minicluster在测试期间运行我的应用程序
- IntelliJ IDEA
- Flink版本1.17.0、maven和java 11
- 因为flink应用程序与s3交互(我相信上面的parquet格式需要它),所以我在项目中使用
flink-s3-fs-hadoop
作为测试依赖项
*问题: - 似乎没有办法将所有必需的AWS服务传递给
flink-s3-fs-hadoop
插件使用的S3客户端 - 由于上述原因,使用Flink和AWS s3进行本地测试目前是不可能的;当我的源代码试图读取我放在localstack s3中的parquet文件时,我遇到了
AmazonS3Exception 403 Forbidden
- 我需要通过的AWS认证如下:
- s3.access-key
- s3.secret-key
- s3.endpoint
- s3.endpoint.region
- s3.path.style.access
- 注意:应用程序在“prod”中工作,因为它在EMR上运行;只是在本地发生此问题
我所尝试的 - 添加AWS_ACCESS_KEY_ID、AWS_SECRET_ACCESS_KEY、AWS_ENDPOINT_URL和AWS_REGION作为环境变量(IntelliJ运行配置)
- 在我设置这些之前,我得到了一个错误,即无法找到访问密钥和密钥,所以我相信这些能够被拾取(在给定https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#Authenticating_via_the_AWS_Environment_Variables的情况下是有意义的)
- 其他环境变量没有被选中;我通过在调试模式下运行我的应用程序并查看AmazonS 3客户端参数来确认
- 在我的测试资源文件夹下添加一个
conf/flink-conf.yaml
,并设置以下环境变量FLINK_CONF_DIR=src/test/resources/conf
- 看起来Flink可以找到这个文件,但它没有将任何值传递给插件
- 使用指定的AWS脚本创建本地执行环境(请参见下面的代码)
Properties props = new Properties();
props.put("s3.access-key", "test");
props.put("s3.secret-key", "test");
props.put("s3.endpoint", "http://localstack:4566");
props.put("s3.endpoint.region", "us-east-1");
props.put("s3.path.style.access", "true");
Configuration configuration = ConfigurationUtils.createConfiguration(props);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(16, configuration);
字符串
- 我使用docker-compose在本地启动了一个Flink集群(jobmanager + taskmanager),在FLINK_PROPERTIES环境变量中添加了作为k,v对的AWS对象(然后添加到flink-conf),启用了
flink-s3-fs-hadoop
插件,并确认flink应用程序在提交到这个集群时可以正常工作(即,这确认了当添加对象并可用于插件时应用程序可以正常工作)。 - 注意:即使这种方式有效,但对于我的用例来说还不够好;我需要能够在像Junit这样的自动化测试套件中运行测试
services:
localstack:
container_name: "awslocal"
image: localstack/localstack:1.4.0
ports:
- "127.0.0.1:4510-4559:4510-4559"
- "127.0.0.1:4566:4566"
- "127.0.0.1:4571:4571"
environment:
- SERVICES=s3
- DEBUG=1
- LS_LOG=trace
- AWS_ACCESS_KEY_ID=test
- AWS_SECRET_ACCESS_KEY=test
- AWS_DEFAULT_REGION=us-east-1
- HOST_TMP_FOLDER=${TMPDIR:-/tmp/}localstack
- DOCKER_HOST=unix:///var/run/docker.sock
volumes:
- "${TMPDIR:-/tmp}/localstack:/var/lib/localstack"
- "/var/run/docker.sock:/var/run/docker.sock"
jobmanager:
container_name: "jobmanager"
image: flink:1.17.0-java11
ports:
- "8081:8081"
command: jobmanager
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
s3.access-key: test
s3.secret-key: test
s3.endpoint: http://awslocal:4566
s3.endpoint.region: us-east-1
s3.path.style.access: true
- ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-1.17.0.jar
volumes:
- /Users/myname/IdeaProjects/my-flink-app/target:/opt/flink/usrlib
taskmanager:
container_name: "taskmanager"
image: flink:1.17.0-java11
depends_on:
- jobmanager
command: taskmanager
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
taskmanager.numberOfTaskSlots: 2
s3.access-key: test
s3.secret-key: test
s3.endpoint: http://awslocal:4566
s3.endpoint.region: us-east-1
s3.path.style.access: true
- ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-1.17.0.jar
型
1条答案
按热度按时间ztyzrc3y1#
你可以试试这个:
1-使用以下命令创建测试配置文件(flink-config.yaml)
字符串
2-加载此配置,
型
关键字:
FileSystem.initialize(config, null);