我是最近学习Apache Flink的超级新手,我想构建一个程序,实现:
当我对一串数字进行boardcast时,例如:Mqtt.fx中的3940351236363752(配置代码未显示,工作正常),我下面写的java程序可以排除离群值,这里是12和52,然后计算平均数,这是我的代码:
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream<String> stream = env.addSource(new MqttConsumer());
DataStream<Integer> dataStream = stream.flatMap(new FlatMapFunction<String, Integer>() {
@Override
public void flatMap(String value, Collector<Integer> out) throws Exception {
String[] str = value.split(" ");
int temperature = Integer.parseInt(str[1]);
out.collect(temperature);
}
}).setParallelism(1);
DataStream<Integer> filteredDataStream = dataStream
.filter(temperature -> temperature != findMaxTemperature(dataStream) && temperature != findMinTemperature(dataStream))
.setParallelism(1);
DataStream<Double> averageDataStream = filteredDataStream
.map(temperature -> (double) temperature)
.reduce(( t1, t2) -> t1 + t2)
.map(sum -> sum / filteredDataStream.count())
.setParallelism(1);
averageDataStream.print();
env.execute();
}
private static int findMaxTemperature(DataStream<Integer> dataStream) throws Exception {
return dataStream.max(1).collect().get(0).getField(0);
}
private static int findMinTemperature(DataStream<Integer> dataStream) throws Exception {
return dataStream.min(1).collect().get(0).getField(0);
}
我还输入了这些包:
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
但是,在reduce
下,有一个错误:The method reduce((<no type> t1, <no type> t2) -> {}) is undefined for the type SingleOutputStreamOperator<Double>
在min
和max
下,它说:The method max(int) is undefined for the type DataStream<Integer>
。
这似乎是一些类型转换的问题,但我试图修改它,我也试图使用keyBy()
,这是说是弃用,和maxBy()
,这也不工作。有人能帮我吗?
2条答案
按热度按时间yhxst69z1#
您尝试构建应用程序的方式将不会起作用。作为一个流处理器,Flink被设计为处理无限的数据流,并处理每个事件一次。如果不对整个流进行预处理,就无法确定流的max和min元素(无论这对无界流意味着什么),因此排除异常值是一件棘手的事情。
相反,您可以跟踪到目前为止在输入流中看到的max和min *,但排除这些也没有意义。
对于这个特定的应用程序,也许您更愿意以批处理模式执行它,在这种情况下,这些问题将消失,然后预期的结果将得到很好的定义。
a11xaf1n2#
正如大卫所指出的,在无界流中无法找到最小值或最大值。如果您必须将其作为流式作业运行,则可以使用Apache DataSketches来计算流式/近似分位数,并排除超出界限的值。还有一个 Bootstrap 问题,在开始过滤之前,您希望处理和累积一些值,以避免误报。