我正在努力将hdfs集成到flink中。
scala二进制版本:2.12,
flink(群集)版本:1.10.1
这里是hadoop\ conf\ dir;
hdfs的配置在这里;
这个配置和hadoop\u conf\u dir在taskmanager中也是一样的。
pom.xml;
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-filesystem_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hadoop-compatibility_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.8.3</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-parquet_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.10.0</version>
</dependency>
</dependencies>
我只是想从hdfs中获取Parquet文件,我的示例代码就在那里;
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
Types.MessageTypeBuilder builder = Types.buildMessage();
MessageType messageType = builder
.required(INT64).named("column1")
.required(BINARY).as(UTF8).named("column2")
.required(INT64).named("column3")
.required(INT64).named("column4")
.required(BINARY).as(UTF8).named("column5")
.required(BINARY).named("column6")
.named("AppendTest");
ParquetTableSource parquetTableSource = ParquetTableSource.builder()
.path("hdfs://hdfs:8020/historic_data/data.parquet")
.withConfiguration(hadoopConf)
.forParquetSchema(messageType)
.build();
tEnv.registerTableSource("datatable", parquetTableSource);
Table table = tEnv.sqlQuery("select * from datatable");
DataSet<Row> tempDataSet = tEnv.toDataSet(table, Row.class);
tempDataSet.print();
env.execute("Job name - short desc.");
错误在这里;
Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not set up JobManager
at org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.<init>(JobManagerRunnerImpl.java:152)
at org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:84)
at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:381)
at org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
... 7 more
Caused by: org.apache.flink.runtime.JobException: Creating the input splits caused an error: Could not find a file system implementation for scheme 'hdfs'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded.
at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:271)
at org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:807)
at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:228)
at org.apache.flink.runtime.scheduler.SchedulerBase.createExecutionGraph(SchedulerBase.java:256)
at org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:228)
at org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:216)
at org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:120)
at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:105)
at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:278)
at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:266)
at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98)
at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40)
at org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.<init>(JobManagerRunnerImpl.java:146)
... 10 more
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 'hdfs'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded.
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:450)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:362)
at org.apache.flink.core.fs.Path.getFileSystem(Path.java:298)
at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:587)
at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:62)
at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:257)
... 22 more
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Hadoop is not in the classpath/dependencies.
at org.apache.flink.core.fs.UnsupportedSchemeFactory.create(UnsupportedSchemeFactory.java:58)
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:446)
... 27 more
在lib文件夹下看到hadoopuberjar,这是一个奇怪的部分
我就是这样服从工作的;
docker exec-it jobmanager env hadoop\u conf\u dir=/tmp/hadoopconf flink run-c文件:///opt/flink/lib/flink-shaded-hadoop-2-uber-2.7.5-10.0.jar-d/tmp/core-batch-1.0.0.jar
我也尝试了用FlinkUI推送作业,但结果是一样的。
任何帮助都将不胜感激。
谢谢。
1条答案
按热度按时间edqdpe6u1#
@aykut,我不确定这会有多大帮助,但我最近一直在测试Flink1.12,它是通过我在github上的ambari flink服务创建的自定义ambari服务交付给hdp3的。当我将flink安装到一个安装了yarn/hdfs的基本ambari集群中时,所有的依赖项都能完美地工作。一旦yarn flink应用程序在yarn集群中运行,我的应用程序测试的其余部分就工作得很好了。Yarn上的参考燧石。
以下是我的flink用户环境变量:
这是我的执行命令:
下面是示例java代码(通过flink将s3文件摄取到hdfs):