emr上s3的外部检查点

toiithl6  于 2021-06-25  发布在  Flink
关注(0)|答案(1)|浏览(306)

我正在尝试为我的flink程序部署一个生产集群。我正在使用一个安装了flink1.3.2的标准hadoop核心emr集群,使用yarn来运行它。
我正在尝试配置rocksdb,以便将检查点写入s3存储桶。我正在浏览这些文档:https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/aws.html#set-s3文件系统。问题似乎是如何使依赖项正常工作。我在尝试运行程序时收到此错误:

java.lang.NoSuchMethodError: org.apache.hadoop.conf.Configuration.addResource(Lorg/apache/hadoop/conf/Configuration;)V
    at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.initialize(EmrFileSystem.java:93)
    at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.initialize(HadoopFileSystem.java:328)
    at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:350)
    at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:389)
    at org.apache.flink.core.fs.Path.getFileSystem(Path.java:293)
    at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.<init>(FsCheckpointStreamFactory.java:99)
    at org.apache.flink.runtime.state.filesystem.FsStateBackend.createStreamFactory(FsStateBackend.java:282)
    at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createStreamFactory(RocksDBStateBackend.java:273

我尝试过离开和调整core-site.xml并保持原样。我试过设置 HADOOP_CLASSPATH/usr/lib/hadoop/share 它包含(我假设是)上述指南中描述的大多数jar。我尝试下载hadoop2.7.2二进制文件,并将它们复制到flink/libs目录中。都会导致相同的错误。
有没有人成功地让flink能够在emr上写入s3?
编辑:我的群集设置
aws门户:

1) EMR -> Create Cluster
2) Advanced Options
3) Release = emr-5.8.0
4) Only select Hadoop 2.7.3
5) Next -> Next -> Next -> Create Cluster ( I do fill out names/keys/etc)

集群启动后,我将ssh连接到主服务器并执行以下操作:

1  wget http://apache.claz.org/flink/flink-1.3.2/flink-1.3.2-bin-hadoop27-scala_2.11.tgz
2  tar -xzf flink-1.3.2-bin-hadoop27-scala_2.11.tgz
3  cd flink-1.3.2
4  ./bin/yarn-session.sh -n 2 -tm 5120 -s 4 -d
5  Change conf/flink-conf.yaml 
6  ./bin/flink run -m yarn-cluster -yn 1 ~/flink-consumer.jar

my conf/flink-conf.yaml我添加了以下字段:

state.backend: rocksdb
state.backend.fs.checkpointdir: s3:/bucket/location
state.checkpoints.dir: s3:/bucket/location

我的程序的检查点设置:

env.enableCheckpointing(getCheckpointRate,CheckpointingMode.EXACTLY_ONCE)
env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(getCheckpointMinPause)
env.getCheckpointConfig.setCheckpointTimeout(getCheckpointTimeout)
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
env.setStateBackend(new RocksDBStateBackend("s3://bucket/location", true))

如果你认为我遗漏了什么步骤,请告诉我

hk8txs48

hk8txs481#

我猜是你安装了Flink 1.3.2 因为亚马逊还没有提供flink 1.3.2 ,对吧?
考虑到这一点,似乎你有一个依赖冲突。方法 org.apache.hadoop.conf.Configuration.addResource(Lorg/apache/hadoop/conf/Configuration) 只是在hadoop中引入的 2.4.0 . 所以我假设你已经部署了一个flink 1.3.2 使用hadoop构建的版本 2.3.0 . 请部署一个flink版本,该版本是用运行在emr上的hadoop版本构建的。这很可能解决所有依赖冲突。
将hadoop依赖项放入 lib 文件夹似乎无法可靠地工作,因为 flink-shaded-hadoop2-uber.jar 在类路径中似乎具有优先级。

相关问题