我遵循这个示例,并用kafkajson实现了相同的示例数据。
消费者样本数据 {"temperature" : 28,"machineName":"xyz"}
```
DataStream patternStream = CEP.pattern(inputEventStream, warningPattern)
.flatSelect(new PatternFlatSelectFunction<TemperatureEvent, Alert>() {
private static final long serialVersionUID = 1L;
@Override
public void flatSelect(Map<String, List<TemperatureEvent>> event, Collector<Alert> out) throws Exception {
new Alert("Temperature Rise Detected:" + ((TemperatureEvent) event.get("first")).getTemperature()
+ " on machine name:" + ((MonitoringEvent) event.get("first")).getMachineName());
}
现在我对arraylist cast有意见了
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:647)
at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
at Test.KafkaApp.main(KafkaApp.java:61)
原因:java.lang.classcastexception:java.util.arraylist无法转换为test.temperatureevent at test.kafkaapp$2.flatselect(kafkaapp)。java:53)在org.apache.flink.cep.operator.flatselectcepoperator.processmatchedsequences(flatselectcepoperator。java:66)在org.apache.flink.cep.operator.abstractkeyedceppatternoperator.processevent(abstractkeyedceppatternoperator)。java:382)在org.apache.flink.cep.operator.abstractkeyedceppatternoperator.processelement(abstractkeyedceppatternoperator。java:198)在org.apache.flink.streaming.runtime.io.streaminputprocessor.processinput(streaminputprocessor。java:202)在org.apache.flink.streaming.runtime.tasks.oneinputstreamtask.run(oneinputstreamtask)。java:105)在org.apache.flink.streaming.runtime.tasks.streamtask.invoke(streamtask。java:300)在org.apache.flink.runtime.taskmanager.task.run(task。java:704)位于java.lang.thread.run(未知源代码)
1条答案
按热度按时间nwlls2ji1#
您的代码包含两个问题:
首先
flatSelect
收到Map<String, List<TemperatureEvent>>
. 这意味着你可能得到多个TemperatureEvents
每个图案。因此,你必须选择你想要的。你没有添加任何
Alerts
到Collector<Alert>
. 平面Map函数不返回值,而是通过Collector<Alert>
不用编译,我觉得这个应该可以顺便说一下,o'reilly存储库中的链接代码不会用flink编译。这个
PatternSelectFunction
签名有误。