apacheflinkawss3sink需要hadoop进行本地测试吗?

ntjbwcob  于 2021-05-29  发布在  Hadoop
关注(0)|答案(2)|浏览(377)

我对apache flink还比较陌生,我正在尝试创建一个简单的项目,将文件生成到aws s3 bucket。根据文档,看起来我需要安装hadoop才能做到这一点。
如何设置本地环境以允许我测试此功能?我已经在本地安装了apacheflink和hadoop。我已经为hadoop的core-site.xml配置添加了必要的更改,还将hadoop\u conf路径添加到flink.yaml配置中。当我试图通过flinkui在本地提交我的工作时,我总是得到一个错误

2016-12-29 16:03:49,861 INFO  org.apache.flink.util.NetUtils                                - Unable to allocate on port 6123, due to error: Address already in use
2016-12-29 16:03:49,862 ERROR org.apache.flink.runtime.jobmanager.JobManager                - Failed to run JobManager.
java.lang.RuntimeException: Unable to do further retries starting the actor system
    at org.apache.flink.runtime.jobmanager.JobManager$.retryOnBindException(JobManager.scala:2203)
    at org.apache.flink.runtime.jobmanager.JobManager$.runJobManager(JobManager.scala:2143)
    at org.apache.flink.runtime.jobmanager.JobManager$.main(JobManager.scala:2040)
    at org.apache.flink.runtime.jobmanager.JobManager.main(JobManager.scala)

我假设我在如何设置我的环境方面遗漏了一些东西。有可能在本地这样做吗?任何帮助都将不胜感激。

oyjwcjzk

oyjwcjzk1#

我必须执行以下操作才能在本地运行我的flink作业,该作业将下沉到s3:
1-将flink-s3-fs-hadoop-1.9.1.jar添加到我的flink/plugins/flink-s3-fs-hadoop目录
2-修改flink/conf/flink-conf.yaml以包含s3.access-key:aws\u access\u key s3.secret-key:aws\u secret\u key fs.hdfs.hadoopconf:/etc/hadoop config
我在hadoop config文件夹中有core-site.xml文件,但它不包含任何配置,因此可能不需要fs.hdfs.hadoopconf。

os8fio9y

os8fio9y2#

虽然您需要hadoop库,但不必安装hadoop就可以在本地运行并写入s3。我只是碰巧尝试了一下,基于avro模式编写了一个Parquet输出,并向s3生成了specifirecord。我正在通过sbt和intellij idea本地运行以下代码的一个版本。所需零件:
1) 使用以下文件指定所需的hadoop属性(注意:不建议定义aws访问密钥/密钥)。最好在具有适当iam角色的ec2示例上运行,以便读/写s3 bucket。但需要进行本地测试)

<configuration>
    <property>
        <name>fs.s3.impl</name>
        <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
    </property>

    <!-- Comma separated list of local directories used to buffer
         large results prior to transmitting them to S3. -->
    <property>
        <name>fs.s3a.buffer.dir</name>
        <value>/tmp</value>
    </property>

    <!-- set your AWS ID using key defined in org.apache.hadoop.fs.s3a.Constants -->
    <property>
        <name>fs.s3a.access.key</name>
        <value>YOUR_ACCESS_KEY</value>
    </property>

    <!-- set your AWS access key -->
    <property>
        <name>fs.s3a.secret.key</name>
        <value>YOUR_SECRET_KEY</value>
    </property>
</configuration>

2) 导入:import com.uebercomputing.eventrecord.eventonlyrecord

import org.apache.flink.api.scala.hadoop.mapreduce.HadoopOutputFormat
import org.apache.flink.api.scala.{ExecutionEnvironment, _}

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
import org.apache.hadoop.conf.{Configuration => HadoopConfiguration}
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.Job

import org.apache.parquet.avro.AvroParquetOutputFormat

3) flink代码使用具有上述配置的hadoopoutputformat:

val events: DataSet[(Void, EventOnlyRecord)] = ...

    val hadoopConfig = getHadoopConfiguration(hadoopConfigFile)

    val outputFormat = new AvroParquetOutputFormat[EventOnlyRecord]
    val outputJob = Job.getInstance

    //Note: AvroParquetOutputFormat extends FileOutputFormat[Void,T]
    //so key is Void, value of type T - EventOnlyRecord in this case
    val hadoopOutputFormat = new HadoopOutputFormat[Void, EventOnlyRecord](
      outputFormat,
      outputJob
    )

    val outputConfig = outputJob.getConfiguration
    outputConfig.addResource(hadoopConfig)
    val outputPath = new Path("s3://<bucket>/<dir-prefix>")
    FileOutputFormat.setOutputPath(outputJob, outputPath)
    AvroParquetOutputFormat.setSchema(outputJob, EventOnlyRecord.getClassSchema)

    events.output(hadoopOutputFormat)

    env.execute

    ...

    def getHadoopConfiguration(hadoodConfigPath: String): HadoopConfiguration = {
      val hadoopConfig = new HadoopConfiguration()
      hadoopConfig.addResource(new Path(hadoodConfigPath))
      hadoopConfig
    }

4) 生成依赖项和使用的版本:

val awsSdkVersion = "1.7.4"
    val hadoopVersion = "2.7.3"
    val flinkVersion = "1.1.4"

    val flinkDependencies = Seq(
      ("org.apache.flink" %% "flink-scala" % flinkVersion),
      ("org.apache.flink" %% "flink-hadoop-compatibility" % flinkVersion)
    )

    val providedFlinkDependencies = flinkDependencies.map(_ % "provided")

    val serializationDependencies = Seq(
      ("org.apache.avro" % "avro" % "1.7.7"),
      ("org.apache.avro" % "avro-mapred" % "1.7.7").classifier("hadoop2"),
      ("org.apache.parquet" % "parquet-avro" % "1.8.1")
    )

    val s3Dependencies = Seq(
      ("com.amazonaws" % "aws-java-sdk" % awsSdkVersion),
      ("org.apache.hadoop" % "hadoop-aws" % hadoopVersion)
    )

使用writesText到s3的编辑:

1) 创建一个hadoop配置目录(将其引用为hadoop conf dir),其中包含一个core-site.xml文件。
例如:

mkdir /home/<user>/hadoop-config
cd /home/<user>/hadoop-config
vi core-site.xml

# content of core-site.xml

<configuration>
    <property>
        <name>fs.s3.impl</name>
        <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
    </property>

    <!-- Comma separated list of local directories used to buffer
         large results prior to transmitting them to S3. -->
    <property>
        <name>fs.s3a.buffer.dir</name>
        <value>/tmp</value>
    </property>

    <!-- set your AWS ID using key defined in org.apache.hadoop.fs.s3a.Constants -->
    <property>
        <name>fs.s3a.access.key</name>
        <value>YOUR_ACCESS_KEY</value>
    </property>

    <!-- set your AWS access key -->
    <property>
        <name>fs.s3a.secret.key</name>
        <value>YOUR_SECRET_KEY</value>
    </property>
</configuration>

2) 创建一个包含文件flink-conf.yaml的目录(将其引用为flink conf dir)。
例如:

mkdir /home/<user>/flink-config
cd /home/<user>/flink-config
vi flink-conf.yaml

//content of flink-conf.yaml - continuing earlier example
fs.hdfs.hadoopconf: /home/<user>/hadoop-config

3) 编辑用于运行s3 flink作业的intellij运行配置-run-edit configurations-并添加以下环境变量:

FLINK_CONF_DIR and set it to your flink-conf-dir

Continuing the example above:
FLINK_CONF_DIR=/home/<user>/flink-config

4) 运行设置了该环境变量的代码:

events.writeAsText("s3://<bucket>/<prefix-dir>")

env.execute

相关问题