我试着在flink上运行cep,测试数据来自本地路径,一开始,我把文件的大小设置为1g,它运行得很好。但是当我把文件的大小设置为10g时,出现了下面的问题。
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:716)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:662)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:662)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.RuntimeException: Failure happened in filter function.
at org.apache.flink.cep.nfa.NFA.computeNextStates(NFA.java:292)
at org.apache.flink.cep.nfa.NFA.process(NFA.java:136)
at org.apache.flink.cep.operator.AbstractCEPPatternOperator.processEvent(AbstractCEPPatternOperator.java:93)
at org.apache.flink.cep.operator.AbstractCEPPatternOperator.processElement(AbstractCEPPatternOperator.java:69)
at org.apache.flink.cep.operator.KeyedCEPPatternOperator.processElement(KeyedCEPPatternOperator.java:147)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:168)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalStateException: Could not find previous shared buffer entry with key: State(send, Normal, [
StateTransition(TAKE, done, with filter),
StateTransition(IGNORE, send),
]), value: cep.customer.Event_d@bd2b81a4 and timestamp: 1461851418716. This can indicate that the element belonging to the previous relation has been already pruned, even though you expect it to be still there.
at org.apache.flink.cep.nfa.SharedBuffer.put(SharedBuffer.java:104)
at org.apache.flink.cep.nfa.NFA.computeNextStates(NFA.java:269)
... 9 more
这是我的密码。谢谢你的帮助
public class ReadFromFile {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().disableSysoutLogging();
env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000));
env.enableCheckpointing(5000); // create a checkpoint every 5 secodns
env.setParallelism(6);
File dir = new File(System.getProperty("user.dir") + "/cep");
///data/tools/devlop/idea/flink-test/cep
System.out.println(dir.getPath());
if (!dir.exists()) {
dir.mkdir();
}
//read data from local file
//it looks like below
//016-04-20T00:04:35.155Z","10.170.236.226","<p2-sidekiq> 2016-04-20T00:04:31.415Z 4982 TID-oxvsomclk AlterationWorker JID-34683abcb587e008153ce458 INFO: start"
final DataStream<String> messageStream =env.readTextFile("file://"+dir);
// filter data
DataStream<String> da = messageStream.filter(new FilterFunction<String>() {
@Override
public boolean filter(String value) throws Exception {
if (value !=null && value.contains(" JID-") && value.contains("INFO: ")) {
return true;
}
return false;
}
});
// format data
DataStream<Tuple4<String, String, String,String>> t3 = da.map(new MapFunction<String, Tuple4<String, String,String, String>>() {
@Override
public Tuple4<String,String, String, String> map(String value) throws Exception {
String[] info = value.split("INFO: ");
if (info.length == 2) {
String[] jid = info[0].split(" JID-");
if (jid.length == 2){
return new Tuple4<String, String, String,String>(jid[0],jid[1].trim(), info[1],"");
}
}
return null;
}
});
//make tuple to event
DataStream<Event_d> input = t3.map(new MapFunction<Tuple4<String, String,String, String>, Event_d>() {
@Override
public Event_d map(Tuple4<String, String,String, String> value) throws Exception {
return new Event_d(value.f0, value.f1, value.f2,value.f3);
}
}).keyBy(new KeySelector<Event_d, String>() {
@Override
public String getKey(Event_d value) throws Exception {
return value.getName();
}
});
// design pattern contains (start --> SendThirdPartWorker --> done)
Pattern<Event_d, ?> pattern= Pattern.<Event_d>begin("start").where(
new FilterFunction<Event_d>() {
@Override
public boolean filter(Event_d value) throws Exception {
return value.getPrice().contains("start");//&& MD5Util.MD5(value.getMd5())==;
}
}).next("send").where(new FilterFunction<Event_d>() {
@Override
public boolean filter(Event_d value) throws Exception {
return value.getPrice().contains("SendThirdPartWorker");//&& jidMap.get(value.getName())==value.getName();
}
}).followedBy("done").where(new FilterFunction<Event_d>() {
@Override
public boolean filter(Event_d value) throws Exception {
return value.getPrice().contains("done") ;//&& a;
}
}).within(milliseconds(1000));
final long mi1 = new Date().getTime();
DataStream<String> result = CEP.pattern(input, pattern).select(
new PatternSelectFunction<Event_d, String>() {
@Override
public String select(Map<String, Event_d> pattern) {
StringBuilder builder = new StringBuilder();
builder.append(dataComt(new Date().getTime(),mi1)+" "+pattern.get("start").getName())
.append(" -- ").append(pattern.get("send").getPrice());
//.append("--").append(pattern.get("done").getPrice());
return builder.toString();
}
});
result.writeAsText(dir + "result", FileSystem.WriteMode.OVERWRITE);
env.execute("Read from Kafka custom");
}
public static String dataComt(long current,long last) {
long c = (current-last)/1000;
return "\"read "+c+"s \"";
}
}
暂无答案!
目前还没有任何答案,快来回答吧!