我有两个名为“alarm”和“intervention”的流,其中包含json。如果连接了报警和干预,则它们将具有相同的键。我想联系他们,检测所有24小时前没有干预的警报。
但是这个程序不起作用,结果给了我所有的警报,好像24小时前没有进行任何干预一样。我重新检查了我的数据集5次,有一些报警在报警日期前不到24小时进行了干预。
此图片说明情况:在此处输入图像描述
所以我需要知道在警报前是否有干预措施。
程序代码:
final KStream<String, JsonNode> alarm = ...;
final KStream<String, JsonNode> intervention = ...;
final JoinWindows jw = JoinWindows.of(TimeUnit.HOURS.toMillis(24)).before(TimeUnit.HOURS.toMillis(24)).after(0);
final KStream<String, JsonNode> joinedAI = alarm.filter((String key, JsonNode value) -> {
return value != null;
}).leftJoin(intervention, (JsonNode leftValue, JsonNode rightValue) -> {
ObjectMapper mapper = new ObjectMapper();
JsonNode actualObj = null;
if (rightValue == null) {//No intervention before
try {
actualObj = mapper.readTree("{\"date\":\"" + leftValue.get("date").asText() + "\","
+ "\"alarm\":" + leftValue.toString()
+ "}");
} catch (IOException ex) {
Logger.getLogger(Main.class.getName()).log(Level.SEVERE, null, ex);
}
return actualObj;
} else {
return null;
}
}, jw, Joined.with(Serdes.String(), jsonSerde, jsonSerde));
final KStream<String, JsonNode> fraude = joinedAI.filter((String key, JsonNode value) -> {
return value != null;
});
fraude.foreach((key, value) -> {
rl.println("Fraude=" + key + " => " + value);
System.out.println("Fraude=" + key + " => " + value);
});
final KafkaStreams streams = new KafkaStreams(builder.build(), streamingConfig);
streams.cleanUp();
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
streams.close();
rl.close();
el.close();
nfl.close();
}
}));
综上所述,我想检测红色矩形中的图案,在这里输入图像描述
p、 s:我确保在报警记录之前发送干预记录
1条答案
按热度按时间roejwanj1#
m、 djx公司,
我认为现在在kafka streams中还没有一个完美的解决方案,但是我有一些想法可以让您更进一步。我准备在不久的将来提交一个kip来解决类似这样的用例。
一点:与ktable不同,kstream不是changelogs,因此较新的事件不会用相同的键覆盖较旧的事件;它们只是共存于同一条溪流中。我想这就是你
foreach
使它看起来像所有的警报没有干预;你会看到干预前的中间连接事件。例如:
joinedAI
.groupByKey()
.windowedBy(
TimeWindows
.of(1000 * 60 * 60 * 24) // the window will be 24 hours in size
.until(1000 * 60 * 60 * 48) // and we'll keep it in the state store for at least 48 hours
).reduce(
new Reducer() {
@Override
public Long apply(final JsonNode value1, final JsonNode value2) {
return value2;
}
},
Materialized.<String, JsonNode, WindowStore<Bytes, byte[]>>as("alerts-without-interventions")
);