我是新来的flink和我是按照socketwindowwordcount的例子。
我正在使用scala 2.11.8和flink 1.3.2,并尝试在emr上运行它,当我运行以下代码时,它抛出了错误:
Caused by: java.lang.ClassNotFoundException: org.apache.flink.api.common.typeinfo.TypeInformation
主类如下所示:
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
object FlinkStreamingPOC {
def main(args: Array[String]) : Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.readTextFile("s3a://somebucket/prefix")
val counts = stream.flatMap{ _.split("\\W+") }
.map { (_, 1) }
.keyBy(0)
.timeWindow(Time.seconds(10))
.sum(1)
counts.print
env.execute("Window Stream WordCount")
}
}
build.sbt如下所示:
scalaVersion := "2.11.8"
val flinkVersion = "1.3.2"
libraryDependencies ++= Seq(
"org.apache.flink" %% "flink-scala" % flinkVersion,
"org.apache.flink" %% "flink-streaming-scala" % flinkVersion
)
我试过了 import org.apache.flink.api.scala._
以及 org.apache.flink.streaming.api.scala._
但还是收到了同样的错误信息。请建议,谢谢!
5条答案
按热度按时间h7wcgrx31#
你可能会遇到和我一样的问题,基本上就是把jar添加到
/lib
文件夹,请参阅此处了解更多详细信息。在amazon emr的情况下,您使用的是flink Jmeter 板。如你所见/opt
在lib文件夹中有所有需要复制的jarncecgwcz2#
打开build.sbt文件并删除
provided
从依赖项flmtquvp3#
我在课堂上也遇到了同样的问题
AverageSensorReadings
项目中:https://github.com/streaming-with-flink/examples-scala. 这是一个maven项目,所以我评论了所有<scope>provided</scope>
对于pom文件中的每个依赖项,它都是有效的。avkwfej44#
如果您使用idea,您可以包含具有“provided”范围的依赖项。
kyxcudwk5#
从flink项目构建的fat jar应该在flink集群环境中运行,因此所有与flink相关的依赖项都将由该环境提供。
其他的答案建议你简单的评论一下
provided
范围从依赖项中删除,因此将这些依赖项包含到fat jar中。这可能有效,但不正确。如果您使用如下命令运行jar
java --classpath target/your-project-jar.jar your.package.SocketWindowWordCount
,则您处于flink集群环境之外。正确的方法是使用
flink run ...
命令式./bin/flink run -c your.package.SocketWindowWordCount target/your-project-jar.jar
.尝试
./bin/flink run --help
详情。