Flink方法未定义特定类型

nnt7mjpx  于 2023-09-28  发布在  Apache
关注(0)|答案(2)|浏览(112)

我是最近学习Apache Flink的超级新手,我想构建一个程序,实现:
当我对一串数字进行boardcast时,例如:Mqtt.fx中的3940351236363752(配置代码未显示,工作正常),我下面写的java程序可以排除离群值,这里是12和52,然后计算平均数,这是我的代码:

public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStream<String> stream = env.addSource(new MqttConsumer());

        DataStream<Integer> dataStream = stream.flatMap(new FlatMapFunction<String, Integer>() {
            @Override
            public void flatMap(String value, Collector<Integer> out) throws Exception {
                String[] str = value.split(" ");
                int temperature = Integer.parseInt(str[1]);
                out.collect(temperature);
            }
        }).setParallelism(1);

        

        DataStream<Integer> filteredDataStream = dataStream
                .filter(temperature -> temperature != findMaxTemperature(dataStream) && temperature != findMinTemperature(dataStream))
                .setParallelism(1);

        DataStream<Double> averageDataStream = filteredDataStream
                .map(temperature -> (double) temperature)
                .reduce(( t1,  t2) ->  t1 + t2)
                .map(sum -> sum / filteredDataStream.count())
                .setParallelism(1);

        averageDataStream.print();
        env.execute();
    }

    private static int findMaxTemperature(DataStream<Integer> dataStream) throws Exception {
        return dataStream.max(1).collect().get(0).getField(0);
    }

    private static int findMinTemperature(DataStream<Integer> dataStream) throws Exception {
        return dataStream.min(1).collect().get(0).getField(0);
    }

我还输入了这些包:

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

但是,在reduce下,有一个错误:The method reduce((<no type> t1, <no type> t2) -> {}) is undefined for the type SingleOutputStreamOperator<Double>
minmax下,它说:The method max(int) is undefined for the type DataStream<Integer>
这似乎是一些类型转换的问题,但我试图修改它,我也试图使用keyBy(),这是说是弃用,和maxBy(),这也不工作。有人能帮我吗?

yhxst69z

yhxst69z1#

您尝试构建应用程序的方式将不会起作用。作为一个流处理器,Flink被设计为处理无限的数据流,并处理每个事件一次。如果不对整个流进行预处理,就无法确定流的max和min元素(无论这对无界流意味着什么),因此排除异常值是一件棘手的事情。
相反,您可以跟踪到目前为止在输入流中看到的max和min *,但排除这些也没有意义。
对于这个特定的应用程序,也许您更愿意以批处理模式执行它,在这种情况下,这些问题将消失,然后预期的结果将得到很好的定义。

a11xaf1n

a11xaf1n2#

正如大卫所指出的,在无界流中无法找到最小值或最大值。如果您必须将其作为流式作业运行,则可以使用Apache DataSketches来计算流式/近似分位数,并排除超出界限的值。还有一个 Bootstrap 问题,在开始过滤之前,您希望处理和累积一些值,以避免误报。

相关问题