Added Depedency Pom Details :
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.7.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.7.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>1.7.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.7.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime_2.11</artifactId>
<version>1.7.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table_2.11</artifactId>
<version>1.7.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.10_2.11</artifactId>
<version>1.7.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-filesystem_2.11</artifactId>
<version>1.7.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hadoop-compatibility_2.11</artifactId>
<version>1.7.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-s3-fs-hadoop</artifactId>
<version>1.7.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-hadoop</artifactId>
<version>1.7.1</version>
<type>pom</type>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aws</artifactId>
<version>2.8.5</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.8.5</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.8.5</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-s3</artifactId>
<version>1.11.529</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-connectors</artifactId>
<version>1.1.5</version>
<type>pom</type>
</dependency>
</dependencies>
java.lang.unsupportedoperationexception:只有在org.apache.flink.runtime.fs.hdfs.hadooprecoverablewriter(hadooprecoverablewriter)上的hdfs和HadoopVersion2.7或更新版本才支持hadoop上的可恢复写入程序。java:57)在org.apache.flink.runtime.fs.hdfs.hadoopfilesystem.createrecoverablewriter(hadoopfilesystem)上。java:202)在org.apache.flink.core.fs.safetynetwrapperfilesystem.createrecoverablewriter(safetynetwrapperfilesystem)。java:69)在org.apache.flink.streaming.api.functions.sink.filesystem.bucket。java:112)位于org.apache.flink.streaming.api.functions.sink.filesystem.streamingfilesink$rowformatbuilder.createbuckets(streamingfilesink)。java:242)在org.apache.flink.streaming.api.functions.sink.filesystem.streamingfilesink.initializestate(streamingfilesink)。java:327)在org.apache.flink.streaming.util.functions.streamingfunctionutils.tryrestorefunction(streamingfunctionutils。java:178)在org.apache.flink.streaming.util.functions.streamingfunctionutils.restorefunctionstate(streamingfunctionutils。java:160)位于org.apache.flink.streaming.api.operators.abstractudfstreamoperator.initializestate(abstractudfstreamoperator)。java:96)在org.apache.flink.streaming.api.operators.abstractstreamoperator.initializestate(abstractstreamoperator。java:278)在org.apache.flink.streaming.runtime.tasks.streamtask.initializestate(streamtask。java:738)在org.apache.flink.streaming.runtime.tasks.streamtask.invoke(streamtask。java:289)在org.apache.flink.runtime.taskmanager.task.run(任务。java:704)在java.lang.thread.run(线程。java:748)
2条答案
按热度按时间zbsbpyhn1#
flink使用一种叫做serviceloader的东西来加载与可插入文件系统接口所需的组件。如果你想看看flink在代码里是怎么做的,那就去
org.apache.flink.core.fs.FileSystem
. 注意这个问题initialize
函数,它利用RAW_FACTORIES
变量。RAW_FACTORIES
是由函数创建的loadFileSystems
,您可以看到它利用了java的ServiceLoader
.在flink上启动应用程序之前,需要先设置文件系统组件。这意味着您的flink应用程序不需要捆绑这些组件,它们应该为您的应用程序提供。
emr没有提供flink需要的s3文件系统组件来将s3用作流文件接收器。抛出这个异常并不是因为版本不够高,而是因为flink加载hadoop文件系统时没有与该版本匹配的文件系统
s3
方案(见此处代码)。您可以通过为my flink应用程序启用调试日志记录级别(emr允许您在配置中执行此操作)来查看文件系统是否正在加载:
相关的日志在yarn资源管理器中可用,查看单个节点的日志。搜索字符串
"Added file system"
应该可以帮助您找到所有成功加载的文件系统。在这个研究中,还可以使用ssh连接到主节点并使用flink scala repl,在这里我可以看到给定文件uri时flink决定加载什么文件系统。
解决方案是将s3文件系统实现的jar放到
/usr/lib/flink/lib/
在开始flink应用程序之前。这可以通过获取flink-s3-fs-hadoop
或者flink-s3-fs-presto
(取决于您使用的实现)。我的引导动作脚本如下所示:pzfprimi2#
为了使用Flink的
StreamingFileSink
只有一次保证,您需要使用hadoop>=2.7
. 以下版本2.7
不支持。因此,请确保您在emr上运行的是最新的hadoop版本。