错误的结果kstream kstream join具有非对称时间窗口

exdqitrt  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(313)

我有两个名为“alarm”和“intervention”的流,其中包含json。如果连接了报警和干预,则它们将具有相同的键。我想联系他们,检测所有24小时前没有干预的警报。
但是这个程序不起作用,结果给了我所有的警报,好像24小时前没有进行任何干预一样。我重新检查了我的数据集5次,有一些报警在报警日期前不到24小时进行了干预。
此图片说明情况:在此处输入图像描述
所以我需要知道在警报前是否有干预措施。
程序代码:

final KStream<String, JsonNode> alarm = ...;

    final KStream<String, JsonNode> intervention = ...;

    final JoinWindows jw = JoinWindows.of(TimeUnit.HOURS.toMillis(24)).before(TimeUnit.HOURS.toMillis(24)).after(0);

    final KStream<String, JsonNode> joinedAI = alarm.filter((String key, JsonNode value) -> {
        return value != null;
    }).leftJoin(intervention, (JsonNode leftValue, JsonNode rightValue) -> {
        ObjectMapper mapper = new ObjectMapper();
        JsonNode actualObj = null;

        if (rightValue == null) {//No intervention before
            try {
                actualObj = mapper.readTree("{\"date\":\"" + leftValue.get("date").asText() + "\","
                        + "\"alarm\":" + leftValue.toString()
                        + "}");
            } catch (IOException ex) {
                Logger.getLogger(Main.class.getName()).log(Level.SEVERE, null, ex);
            }
            return actualObj;
        } else {
            return null;
        }
    }, jw, Joined.with(Serdes.String(), jsonSerde, jsonSerde));

    final KStream<String, JsonNode> fraude = joinedAI.filter((String key, JsonNode value) -> {
        return value != null;
    });

    fraude.foreach((key, value) -> {
        rl.println("Fraude=" + key + " => " + value);
        System.out.println("Fraude=" + key + " => " + value);
    });

    final KafkaStreams streams = new KafkaStreams(builder.build(), streamingConfig);

    streams.cleanUp();
    streams.start();

    Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
        @Override
        public void run() {
            streams.close();
            rl.close();
            el.close();
            nfl.close();
        }
    }));

综上所述,我想检测红色矩形中的图案,在这里输入图像描述
p、 s:我确保在报警记录之前发送干预记录

roejwanj

roejwanj1#

m、 djx公司,
我认为现在在kafka streams中还没有一个完美的解决方案,但是我有一些想法可以让您更进一步。我准备在不久的将来提交一个kip来解决类似这样的用例。
一点:与ktable不同,kstream不是changelogs,因此较新的事件不会用相同的键覆盖较旧的事件;它们只是共存于同一条溪流中。我想这就是你 foreach 使它看起来像所有的警报没有干预;你会看到干预前的中间连接事件。
例如:

LEFT   RIGHT    JOIN
a:1             a:(1,null)
       a:X      a:(1,X)
``` `foreach` 将在两个连接结果上调用,使其看起来缺少正确的值,而实际上只是有点晚了。
如果在结果流上应用一个时间窗口,您将得到一个changelog——较新的值将覆盖较旧的值。比如:

joinedAI
.groupByKey()
.windowedBy(
TimeWindows
.of(1000 * 60 * 60 * 24) // the window will be 24 hours in size
.until(1000 * 60 * 60 * 48) // and we'll keep it in the state store for at least 48 hours
).reduce(
new Reducer() {
@Override
public Long apply(final JsonNode value1, final JsonNode value2) {
return value2;
}
},
Materialized.<String, JsonNode, WindowStore<Bytes, byte[]>>as("alerts-without-interventions")
);

最糟糕的是,这将产生一个具有正确语义的changelog流,但是您仍然可以看到中间值,因此您也不希望直接从这个流触发任何操作(比如 `foreach` ).
你可以做的一件事是安排一项工作,每天一次,进行扫描 `"alerts-without-interventions"` 昨天的Windows。从窗口存储得到的任何结果都将是该键的最新值。
我正在准备的kip将提出一种方法,让您从窗口中过滤出中间结果,这将允许您将foreach附加到changelog,并仅在窗口的最终结果上触发它。
或者,如果应用程序的数据不是太大,并且不太担心边缘情况,可以考虑使用linkedhashmap或guava缓存自己实现“窗口最终事件”语义。
我希望这有帮助。

相关问题