我正在努力建立一个apache flink流作业,它计算非常简单的物联网数据。它使用rabbitmq的(source),因此使用rmqsource。这很好,而且对这些数据的解析也很好。
然而,对于这个解析的数据流(triplet'string,double,long'(sensorid,value of'pm2,5',timestamp)类型),之后应用的函数似乎很奇怪。
首先,我想在传感器上设置流的密钥。
第二,我想创建一个窗口,其中包含每10秒或15秒根据id设置关键帧的所有元素。
第三,应该在这个窗口上执行一个非常基本的ProcessWindow函数,它只计算该窗口中的元素基本上,就像文档中的例子。
最后,processwindowfunction的输出应该打印到std.out。
你可以看到下面的相关部分。我使用jmeter和mqtt以及kafkameter插件发送测试数据,一次发送大约50个请求,然后看看会发生什么。
当我发送10个请求时,结果如下所示:
nope :(
nope :(
nope :(
nope :(
nope :(
nope :(
nope :(
nope :(
nope :(
nope :(
对于我的逻辑,这意味着processwindowfunction为每个值计算,而不是为一个窗口计算一次。
我现在的问题是:
有什么我不知道的吗?为什么它不只计算一次函数并输出一个窗口中的元素数呢(我已经尝试了几十种不同的版本,通读了所有相关的stackoverflow帖子,阅读了大量不同的教程,等等,都没有效果)。
打印到std.out是否与此有关(也尝试将其写入文件=>相同的行为)?
我也试过用这个 .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
与 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
希望它能工作,但它没有。
非常感谢你的帮助。
extractedDataStream
.keyBy(t -> t.getValue0()) // keyed by sensor IDs
//.timeWindow(Time.seconds(10))
.window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
.process(new DetectTooHighAirPollution())
.print();
// execute program
env.execute("MQTT Detection StreamingJob");
}
public static class DetectTooHighAirPollution
extends ProcessWindowFunction<Triplet<String, Double, Long>, String, String, TimeWindow> {
@Override
public void process(String key, Context context, Iterable<Triplet<String, Double, Long>> input, Collector<String> out) throws IOException {
long count = 0;
for (Triplet<String, Double, Long> i : input) {
count++;
}
if (count > 1) {
out.collect("yap :D!: " + count);
} else {
out.collect("nope :(");
}
}
}
}
}
为了完整性,代码的其余部分将执行它应该执行的操作:
ps:我正在发送mqtt消息,有效负载是json对象,我现在“手动”解析它。
pps:已删除配置详细信息。
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.rabbitmq.RMQSource;
import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;
import org.apache.flink.util.Collector;
import org.javatuples.Triplet;
import java.io.IOException;
public class StreamingJob {
public static void main(String[] args) throws Exception {
// set up the streaming execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
//env.setParallelism(1);
// Set up a configuration for the RabbitMQ Source
final RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
.setHost("")
.setPort()
.setUserName("")
.setPassword("")
.setVirtualHost("")
.build();
// Initiating a Data Stream from RabbitMQ
final DataStream<String> RMQstream = env
.addSource(new RMQSource<String>(
connectionConfig, // config for the RabbitMQ connection
"", // name of the RabbitMQ queue to consume
false, // use correlation ids; can be false if only at-least-once is required
new SimpleStringSchema())) // deserialization schema to turn messages into Java objects
.setParallelism(1); // parallel Source
//Extraction of values of the Data Stream
final DataStream<Triplet<String, Double, Long>> extractedDataStream = RMQstream.map(
new RichMapFunction<String, Triplet<String, Double, Long>>() {
@Override
public Triplet<String, Double, Long> map(String s) throws Exception {
// Extract the payload of the message
String[] input = s.split(",");
// Extract the sensor ID
String sensorID = input[1];
String unformattedID = sensorID.split(":")[1];
String id = unformattedID.replaceAll(" ", "");
// Extract longitude
String sensorLONG = input[2];
String unformattedLONGTD = sensorLONG.split(":")[1];
String longtd = unformattedLONGTD.replaceAll(" ", "");
// Extract latitude
String sensorLAT = input[3];
String unformattedLATD = sensorLAT.split(":")[1];
String latd = unformattedLATD.replaceAll(" ", "");
// Extract the particulate matter
String sensorPM2 = input[6];
String unformattedPM2 = sensorPM2.split(":")[1];
String pm2String = unformattedPM2.replaceAll("[ }]+", "");
double pm2 = Double.valueOf(pm2String).doubleValue();
long ts = System.currentTimeMillis();
Triplet<String, Double, Long> sensorData = Triplet.with(id, pm2, ts);
return sensorData;
}
}
);
再次感谢,希望有人经历过这一点,或者可以只是指出(可能是明显的)错误,我正在做。
更新21.11.2019
我找到了解决问题的办法。我误解了“键控流”的概念。对于我的用例来说,在应用processwindowfunction时从窗口中获取单个值的结果,我根本不需要“keyedstreams”。相反,我必须使用以下代码:
在我的例子中,'.keyby'确实为每个sensorid构造了一个窗口。因此,当100个传感器(100个不同的ID)在很短的时间跨度(毫秒)内发送请求时,我得到100个窗口和100个processwindowfunction结果。
这不是我想要的,所以我必须使用“.windowall”操作来获得一个包含流中所有元素的窗口。后来我不得不应用“processallwindowfunction”而不是“processwindowfunction”,瞧:它成功了d
...
extractedDataStream
//.filter(t -> t.getValue1() > 30) //This is just a use-case specific => 71/100 sensor requests have a higher value than 30.
.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(15)))
.process(new DetectTooHighAirPollution())
.print();
...
public static class DetectTooHighAirPollution
extends ProcessAllWindowFunction<Triplet<String, Double, Long>, String, TimeWindow> {
@Override
public void process(Context context, Iterable<Triplet<String, Double, Long>> input, Collector<String> out) throws IOException {
long count = 0;
for (Triplet<String, Double, Long> i : input) {
count++;
}
if (count >= 10) {
out.collect(count + " Sensors, report a too high concentration of PM2!");
} else {
out.collect("Upps something went wrong :/");
}
}
}
干杯!
暂无答案!
目前还没有任何答案,快来回答吧!