我有一个下面的示例flink应用程序,我正在尝试在集群上运行。
public class ClusterConnect {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment
.createremoteenvironment(“x.x.x.x”,6123“);
// get input data
DataSet<String> text = env.fromElements("To be, or not to be,--that is the question:--",
"Whether 'tis nobler in the mind to suffer", "The slings and arrows of outrageous fortune",
"Or to take arms against a sea of troubles,");
DataSet<Tuple2<String, Integer>> counts = text
.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
for (String word : s.split(" ")) {
collector.collect(new Tuple2<String, Integer>(word, 1));
}
}
})
.groupBy(0)
.sum(1);
// execute and print result
counts.print();
env.execute();
}
集群由一个jobmanager(免费aws示例)和两个taskmanager(免费aws示例)设置。当尝试在不同的aws上运行上面的flink应用程序时(可能到达jobmanager、taskmanagers),出现以下错误。
应用程序错误:
WARN [akka.remote.ReliableDeliverySupervisor] Association with remote system [akka.tcp://flink@172.31.29.190:6123] has failed, address is now gated for [5000] ms. Reason is: [Disassociated].
来自群集作业管理器的日志:
016-11-30 22:00:42,796 WARN akka.remote.ReliableDeliverySupervisor - Association with remote system [akka.tcp://flink@172.31.6.190:33619] has failed, address is now gated for [5000] ms. Reason is: [scala.Option; local class incompatible: stream classdesc serialVersionUID = -2062608324514658839, local class serialVersionUID = -114498752079829388]
暂无答案!
目前还没有任何答案,快来回答吧!