为什么这个processwindowfunction总是计算每个传入元素而不是一个窗口的所有元素?

zsbz8rwp  于 2021-06-24  发布在  Flink
关注(0)|答案(0)|浏览(337)

我正在努力建立一个apache flink流作业,它计算非常简单的物联网数据。它使用rabbitmq的(source),因此使用rmqsource。这很好,而且对这些数据的解析也很好。
然而,对于这个解析的数据流(triplet'string,double,long'(sensorid,value of'pm2,5',timestamp)类型),之后应用的函数似乎很奇怪。
首先,我想在传感器上设置流的密钥。
第二,我想创建一个窗口,其中包含每10秒或15秒根据id设置关键帧的所有元素。
第三,应该在这个窗口上执行一个非常基本的ProcessWindow函数,它只计算该窗口中的元素基本上,就像文档中的例子。
最后,processwindowfunction的输出应该打印到std.out。
你可以看到下面的相关部分。我使用jmeter和mqtt以及kafkameter插件发送测试数据,一次发送大约50个请求,然后看看会发生什么。
当我发送10个请求时,结果如下所示:

nope :(
nope :(
nope :(
nope :(
nope :(
nope :(
nope :(
nope :(
nope :(
nope :(

对于我的逻辑,这意味着processwindowfunction为每个值计算,而不是为一个窗口计算一次。
我现在的问题是:
有什么我不知道的吗?为什么它不只计算一次函数并输出一个窗口中的元素数呢(我已经尝试了几十种不同的版本,通读了所有相关的stackoverflow帖子,阅读了大量不同的教程,等等,都没有效果)。
打印到std.out是否与此有关(也尝试将其写入文件=>相同的行为)?
我也试过用这个 .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); 希望它能工作,但它没有。
非常感谢你的帮助。

extractedDataStream
                .keyBy(t -> t.getValue0()) // keyed by sensor IDs
                //.timeWindow(Time.seconds(10))
                .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
                .process(new DetectTooHighAirPollution())
                .print();

        // execute program
        env.execute("MQTT Detection StreamingJob");

    }

    public static class DetectTooHighAirPollution
            extends ProcessWindowFunction<Triplet<String, Double, Long>, String, String, TimeWindow> {

        @Override
        public void process(String key, Context context, Iterable<Triplet<String, Double, Long>> input, Collector<String> out) throws IOException {

            long count = 0;

            for (Triplet<String, Double, Long> i : input) {
                count++;
            }

            if (count > 1) {
                out.collect("yap :D!: " + count);
            } else {
                out.collect("nope :(");
            }

             }
        }
    }
}

为了完整性,代码的其余部分将执行它应该执行的操作:
ps:我正在发送mqtt消息,有效负载是json对象,我现在“手动”解析它。
pps:已删除配置详细信息。

import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.rabbitmq.RMQSource;
import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;
import org.apache.flink.util.Collector;
import org.javatuples.Triplet;
import java.io.IOException;

public class StreamingJob {

    public static void main(String[] args) throws Exception {
        // set up the streaming execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        //env.setParallelism(1);

        // Set up a configuration for the RabbitMQ Source
        final RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
                .setHost("")
                .setPort()
                .setUserName("")
                .setPassword("")
                .setVirtualHost("")
                .build();
        // Initiating a Data Stream from RabbitMQ
        final DataStream<String> RMQstream = env
                .addSource(new RMQSource<String>(
                        connectionConfig,            // config for the RabbitMQ connection
                        "",                 // name of the RabbitMQ queue to consume
                        false,                        // use correlation ids; can be false if only at-least-once is required
                        new SimpleStringSchema()))   // deserialization schema to turn messages into Java objects
                .setParallelism(1);              // parallel Source

        //Extraction of values of the Data Stream

        final DataStream<Triplet<String, Double, Long>> extractedDataStream = RMQstream.map(
                new RichMapFunction<String, Triplet<String, Double, Long>>() {
                    @Override
                    public Triplet<String, Double, Long> map(String s) throws Exception {
                        // Extract the payload of the message
                        String[] input = s.split(",");

                        // Extract the sensor ID
                        String sensorID = input[1];
                        String unformattedID = sensorID.split(":")[1];
                        String id = unformattedID.replaceAll(" ", "");

                        // Extract longitude
                        String sensorLONG = input[2];
                        String unformattedLONGTD = sensorLONG.split(":")[1];
                        String longtd = unformattedLONGTD.replaceAll(" ", "");

                        // Extract latitude
                        String sensorLAT = input[3];
                        String unformattedLATD = sensorLAT.split(":")[1];
                        String latd = unformattedLATD.replaceAll(" ", "");

                        // Extract the particulate matter
                        String sensorPM2 = input[6];
                        String unformattedPM2 = sensorPM2.split(":")[1];
                        String pm2String = unformattedPM2.replaceAll("[ }]+", "");

                        double pm2 = Double.valueOf(pm2String).doubleValue();

                        long ts = System.currentTimeMillis();

                        Triplet<String, Double, Long> sensorData = Triplet.with(id, pm2, ts);
                        return sensorData;
                    }

                }
        );

再次感谢,希望有人经历过这一点,或者可以只是指出(可能是明显的)错误,我正在做。

更新21.11.2019

我找到了解决问题的办法。我误解了“键控流”的概念。对于我的用例来说,在应用processwindowfunction时从窗口中获取单个值的结果,我根本不需要“keyedstreams”。相反,我必须使用以下代码:
在我的例子中,'.keyby'确实为每个sensorid构造了一个窗口。因此,当100个传感器(100个不同的ID)在很短的时间跨度(毫秒)内发送请求时,我得到100个窗口和100个processwindowfunction结果。
这不是我想要的,所以我必须使用“.windowall”操作来获得一个包含流中所有元素的窗口。后来我不得不应用“processallwindowfunction”而不是“processwindowfunction”,瞧:它成功了d

...
extractedDataStream
                //.filter(t -> t.getValue1() > 30) //This is just a use-case specific => 71/100 sensor requests have a higher value than 30.
                .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(15)))
                .process(new DetectTooHighAirPollution())
                .print();
...
public static class DetectTooHighAirPollution
            extends ProcessAllWindowFunction<Triplet<String, Double, Long>, String, TimeWindow> {

        @Override
        public void process(Context context, Iterable<Triplet<String, Double, Long>> input, Collector<String> out) throws IOException {

            long count = 0;

            for (Triplet<String, Double, Long> i : input) {
                count++;
            }

            if (count >= 10) {
                out.collect(count + " Sensors, report a too high concentration of PM2!");
            } else {
                out.collect("Upps something went wrong :/");
            }
        }
    }

干杯!

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题