找不到方案“hdfs”的文件系统实现

guz6ccqo  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(539)

我正在努力将hdfs集成到flink中。
scala二进制版本:2.12,
flink(群集)版本:1.10.1
这里是hadoop\ conf\ dir;
hdfs的配置在这里;
这个配置和hadoop\u conf\u dir在taskmanager中也是一样的。
pom.xml;

<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-common</artifactId>
        <version>${flink.version}</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-filesystem_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-hadoop-compatibility_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>2.8.3</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-parquet_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-hdfs</artifactId>
        <version>2.10.0</version>
    </dependency>
</dependencies>

我只是想从hdfs中获取Parquet文件,我的示例代码就在那里;

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);

    Types.MessageTypeBuilder builder = Types.buildMessage();

    MessageType messageType = builder
            .required(INT64).named("column1")
            .required(BINARY).as(UTF8).named("column2")
            .required(INT64).named("column3")
            .required(INT64).named("column4")
            .required(BINARY).as(UTF8).named("column5")
            .required(BINARY).named("column6")
            .named("AppendTest");

    ParquetTableSource parquetTableSource = ParquetTableSource.builder()
        .path("hdfs://hdfs:8020/historic_data/data.parquet")
        .withConfiguration(hadoopConf)
        .forParquetSchema(messageType)
        .build();

    tEnv.registerTableSource("datatable", parquetTableSource);

    Table table = tEnv.sqlQuery("select * from datatable");

    DataSet<Row> tempDataSet = tEnv.toDataSet(table, Row.class);

    tempDataSet.print();

    env.execute("Job name - short desc.");

错误在这里;

Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not set up JobManager
at org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.<init>(JobManagerRunnerImpl.java:152)
at org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:84)
at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:381)
at org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
... 7 more
Caused by: org.apache.flink.runtime.JobException: Creating the input splits caused an error: Could not find a file system implementation for scheme 'hdfs'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded.
at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:271)
at org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:807)
at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:228)
at org.apache.flink.runtime.scheduler.SchedulerBase.createExecutionGraph(SchedulerBase.java:256)
at org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:228)
at org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:216)
at org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:120)
at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:105)
at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:278)
at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:266)
at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98)
at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40)
at org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.<init>(JobManagerRunnerImpl.java:146)
... 10 more
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 'hdfs'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded.
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:450)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:362)
at org.apache.flink.core.fs.Path.getFileSystem(Path.java:298)
at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:587)
at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:62)
at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:257)
... 22 more
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Hadoop is not in the classpath/dependencies.
at org.apache.flink.core.fs.UnsupportedSchemeFactory.create(UnsupportedSchemeFactory.java:58)
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:446)
... 27 more

在lib文件夹下看到hadoopuberjar,这是一个奇怪的部分
我就是这样服从工作的;
docker exec-it jobmanager env hadoop\u conf\u dir=/tmp/hadoopconf flink run-c文件:///opt/flink/lib/flink-shaded-hadoop-2-uber-2.7.5-10.0.jar-d/tmp/core-batch-1.0.0.jar
我也尝试了用FlinkUI推送作业,但结果是一样的。
任何帮助都将不胜感激。
谢谢。

edqdpe6u

edqdpe6u1#

@aykut,我不确定这会有多大帮助,但我最近一直在测试Flink1.12,它是通过我在github上的ambari flink服务创建的自定义ambari服务交付给hdp3的。当我将flink安装到一个安装了yarn/hdfs的基本ambari集群中时,所有的依赖项都能完美地工作。一旦yarn flink应用程序在yarn集群中运行,我的应用程序测试的其余部分就工作得很好了。Yarn上的参考燧石。
以下是我的flink用户环境变量:

export HADOOP_CONF_DIR=/etc/hadoop/conf; export HADOOP_CLASSPATH=/usr/hdp/3.1.4.0-315/hadoop/conf:/usr/hdp/3.1.4.0-315/hadoop/lib/*:/usr/hdp/3.1.4.0-315/hadoop/.//*:/usr/hdp/3.1.4.0-315/hadoop-hdfs/./:/usr/hdp/3.1.4.0-315/hadoop-hdfs/lib/*:/usr/hdp/3.1.4.0-315/hadoop-hdfs/.//*:/usr/hdp/3.1.4.0-315/hadoop-mapreduce/lib/*:/usr/hdp/3.1.4.0-315/hadoop-mapreduce/.//*:/usr/hdp/3.1.4.0-315/hadoop-yarn/./:/usr/hdp/3.1.4.0-315/hadoop-yarn/lib/*:/usr/hdp/3.1.4.0-315/hadoop-yarn/.//*:/usr/hdp/3.1.4.0-315/tez/*:/usr/hdp/3.1.4.0-315/tez/lib/*:/usr/hdp/3.1.4.0-315/tez/conf:/usr/hdp/3.1.4.0-315/tez/conf_llap:/usr/hdp/3.1.4.0-315/tez/doc:/usr/hdp/3.1.4.0-315/tez/hadoop-shim-0.9.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/hadoop-shim-2.8-0.9.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/lib:/usr/hdp/3.1.4.0-315/tez/man:/usr/hdp/3.1.4.0-315/tez/tez-api-0.9.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/tez-common-0.9.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/tez-dag-0.9.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/tez-examples-0.9.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/tez-history-parser-0.9.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/tez-javadoc-tools-0.9.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/tez-job-analyzer-0.9.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/tez-mapreduce-0.9.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/tez-protobuf-history-plugin-0.9.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/tez-runtime-internals-0.9.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/tez-runtime-library-0.9.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/tez-tests-0.9.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/tez-yarn-timeline-cache-plugin-0.9.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/tez-yarn-timeline-history-0.9.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/tez-yarn-timeline-history-with-acls-0.9.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/tez-yarn-timeline-history-with-fs-0.9.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/ui:/usr/hdp/3.1.4.0-315/tez/lib/async-http-client-1.9.40.jar:/usr/hdp/3.1.4.0-315/tez/lib/commons-cli-1.2.jar:/usr/hdp/3.1.4.0-315/tez/lib/commons-codec-1.4.jar:/usr/hdp/3.1.4.0-315/tez/lib/commons-collections-3.2.2.jar:/usr/hdp/3.1.4.0-315/tez/lib/commons-collections4-4.1.jar:/usr/hdp/3.1.4.0-315/tez/lib/commons-io-2.4.jar:/usr/hdp/3.1.4.0-315/tez/lib/commons-lang-2.6.jar:/usr/hdp/3.1.4.0-315/tez/lib/commons-math3-3.1.1.jar:/usr/hdp/3.1.4.0-315/tez/lib/gcs-connector-1.9.10.3.1.4.0-315-shaded.jar:/usr/hdp/3.1.4.0-315/tez/lib/guava-28.0-jre.jar:/usr/hdp/3.1.4.0-315/tez/lib/hadoop-aws-3.1.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/lib/hadoop-azure-3.1.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/lib/hadoop-azure-datalake-3.1.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/lib/hadoop-hdfs-client-3.1.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/lib/hadoop-mapreduce-client-common-3.1.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/lib/hadoop-mapreduce-client-core-3.1.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/lib/hadoop-yarn-server-timeline-pluginstorage-3.1.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/lib/jersey-client-1.19.jar:/usr/hdp/3.1.4.0-315/tez/lib/jersey-json-1.19.jar:/usr/hdp/3.1.4.0-315/tez/lib/jettison-1.3.4.jar:/usr/hdp/3.1.4.0-315/tez/lib/jetty-server-9.3.24.v20180605.jar:/usr/hdp/3.1.4.0-315/tez/lib/jetty-util-9.3.24.v20180605.jar:/usr/hdp/3.1.4.0-315/tez/lib/jsr305-3.0.0.jar:/usr/hdp/3.1.4.0-315/tez/lib/metrics-core-3.1.0.jar:/usr/hdp/3.1.4.0-315/tez/lib/protobuf-java-2.5.0.jar:/usr/hdp/3.1.4.0-315/tez/lib/RoaringBitmap-0.4.9.jar:/usr/hdp/3.1.4.0-315/tez/lib/servlet-api-2.5.jar:/usr/hdp/3.1.4.0-315/tez/lib/slf4j-api-1.7.10.jar:/usr/hdp/3.1.4.0-315/tez/lib/tez.tar.gz;

这是我的执行命令:

/opt/flink/flink-dist/target/flink-1.12-SNAPSHOT-bin/flink-1.12-SNAPSHOT/bin/flink run javaProgram.jar

下面是示例java代码(通过flink将s3文件摄取到hdfs):

package org.myorg.quickstart;

import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class S3FlinkIngest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<String> dataStream = streamExecutionEnvironment.readTextFile("s3a://s3Bucket/input.xml");
        dataStream.writeAsText("hdfs://hadoopHost:8020/user/flink/ingest.xml", org.apache.flink.core.fs.FileSystem.WriteMode.OVERWRITE);;
        streamExecutionEnvironment.execute("S3 Flink Ingest");
    } }

相关问题