flink模式在警报代码中遇到arralist问题?

ru9i0ody  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(246)

我遵循这个示例,并用kafkajson实现了相同的示例数据。
消费者样本数据 {"temperature" : 28,"machineName":"xyz"} ```
DataStream patternStream = CEP.pattern(inputEventStream, warningPattern)
.flatSelect(new PatternFlatSelectFunction<TemperatureEvent, Alert>() {
private static final long serialVersionUID = 1L;

    @Override
    public void flatSelect(Map<String, List<TemperatureEvent>> event, Collector<Alert> out) throws Exception {
        new Alert("Temperature Rise Detected:" + ((TemperatureEvent) event.get("first")).getTemperature()
                + " on machine name:" + ((MonitoringEvent) event.get("first")).getMachineName());

    }
现在我对arraylist cast有意见了

Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:647)
at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
at Test.KafkaApp.main(KafkaApp.java:61)

原因:java.lang.classcastexception:java.util.arraylist无法转换为test.temperatureevent at test.kafkaapp$2.flatselect(kafkaapp)。java:53)在org.apache.flink.cep.operator.flatselectcepoperator.processmatchedsequences(flatselectcepoperator。java:66)在org.apache.flink.cep.operator.abstractkeyedceppatternoperator.processevent(abstractkeyedceppatternoperator)。java:382)在org.apache.flink.cep.operator.abstractkeyedceppatternoperator.processelement(abstractkeyedceppatternoperator。java:198)在org.apache.flink.streaming.runtime.io.streaminputprocessor.processinput(streaminputprocessor。java:202)在org.apache.flink.streaming.runtime.tasks.oneinputstreamtask.run(oneinputstreamtask)。java:105)在org.apache.flink.streaming.runtime.tasks.streamtask.invoke(streamtask。java:300)在org.apache.flink.runtime.taskmanager.task.run(task。java:704)位于java.lang.thread.run(未知源代码)
nwlls2ji

nwlls2ji1#

您的代码包含两个问题:
首先 flatSelect 收到 Map<String, List<TemperatureEvent>> . 这意味着你可能得到多个 TemperatureEvents 每个图案。因此,你必须选择你想要的。
你没有添加任何 AlertsCollector<Alert> . 平面Map函数不返回值,而是通过 Collector<Alert> 不用编译,我觉得这个应该可以

DataStream<Alert> patternStream = CEP.pattern(inputEventStream, warningPattern)
    .flatSelect(
    new PatternFlatSelectFunction<TemperatureEvent, Alert>() {
            private static final long serialVersionUID = 1L;

        @Override
        public void flatSelect(Map<String, List<TemperatureEvent>> event, Collector<Alert> out) throws Exception {
            TemperatureEvent temperatureEvent = event.get("first").get(0);
            out.collect(new Alert("Temperature Rise Detected:" + temperatureEvent.getTemperature() + " on machine name:" + temperatureEvent.getMachineName()));
        }
       });

顺便说一下,o'reilly存储库中的链接代码不会用flink编译。这个 PatternSelectFunction 签名有误。

相关问题