.kafka流会话窗口:聚合间歇性失败

0h4hbjxa  于 2021-06-08  发布在  Kafka
关注(0)|答案(0)|浏览(254)

我们使用kafka流(0.10.2.0)来聚合相关事件。我们正在使用sessionwindows聚合事件。50%的时间里,这种聚合似乎不会发生。以下是场景:
请求-1:已成功聚合所有事件
组ID:
事件:e1;事件时间=2017-05-31t14:36:56.653z e2;事件时间=2017-05-31t14:36:56.653z e3;事件时间=2017-05-31t14:36:56.653z
请求-2:没有聚合任何事件
组ID:efg
事件:e1;事件时间=2017-05-31t14:36:56.653z e2;事件时间=2017-05-31t14:36:56.653z e3;事件时间=2017-05-31t14:36:56.653z
流式处理信息:
timestampextractor:流使用事件时间提取器将事件分组到windows。
窗口类型:会话窗口。
windows非活动期=2分钟
流配置:cache\u max\u bytes\u buffering\u config=0
timestamp\u extractor\u class\u config=eventtimeextractorimpl
代码段:

KStreamBuilder builder = new KStreamBuilder();
 final KStream<String, GenericRecord> events = builder.stream(_appProperties.collationSourceTopic);
events.print();
KGroupedStream<String, GenericRecord> groupedStream = events.groupByKey(Serdes.String(), GenericCdsSerde.GenericCdsSerde());
    SessionWindows tmpSessionWindows = SessionWindows.with(TimeUnit.MINUTES.toMillis(Long.parseLong(_appProperties.collationWindowInMins)));

    KTable<Windowed<String>, List<GenericRecord>> sessionizedAggregatedStream = groupedStream.aggregate(
            ArrayList::new,
            (aggKey, newValue, aggValue) -> {
                try {
                    aggValue.add(newValue);
                } catch (Exception e) {
                    logger.error("failed aggr session windows", e);
                    return null;
                }
                return aggValue;
            },
            (aggKey, leftAggValue, rightAggValue) -> {
                try {
                    leftAggValue.addAll(rightAggValue);
                } catch (Exception e) {
                    logger.error("failed merging session windows", e);
                    return null;
                }
                return leftAggValue;
            },
            tmpSessionWindows, /* session window */
            GenericListCdsSerde.GenericListCdsSerde(), 
            "session-store";

    sessionizedAggregatedStream.print();
    sessionizedAggregatedStream.toStream().foreach((stringWindowed, s) ->
            logger.info("WindowedTable: window: " + stringWindowed.key()
                    + "start ==> " + ((SessionWindow)stringWindowed.window()).start()
                    + " end ==> " + stringWindowed.window().end()
                    + " windowedValue: " + s));

成功分组事件的日志:groupid:ng28
事件1到达:
2017-06-01 09:57:23861信息流线程-1 c.s.c.c.s.eventtimeextractor:49 - =======读取组ID:ng28事件ID:1事件时间:2017-05-31t14:36:56.653z
[kstream-source-0000000000]:ng28,{“eventid”:“1”,“groupid”:“ng28”,“eventtime”:“2017-05-31t14:36:56.653z”}
[kstream-aggregate-0000000003]:[ng28@1496241416653],([{“eventid”:“1”,“groupid”:“ng28”,“eventtime”:“2017-05-31t14:36:56.653z”}]<-null)
2017-06-01 09:57:23864信息流线程-1 c.s.c.c.s.csistreamfactory:196 - windowedtable:window:ng28开始==>14962416653结束==>14962416653 windowedvalue:[{“eventid”:“1”,“groupid”:“ng28”,“eventtime”:“2017-05-31t14:36:56.653z”}]
事件2到达:
2017-06-01 09:57:27158信息流线程-1 c.s.c.c.s.eventtimeextractor:49 - =======读取组ID:ng28事件ID:2事件时间:2017-05-31t14:36:56.653z
[kstream-source-0000000000]:ng28,{“eventid”:“2”,“groupid”:“ng28”,“eventtime”:“2017-05-31t14:36:56.653z”}
2017-06-01 09:57:27158信息流线程-1 c.s.c.c.s.csistreamfactory:164 - -------原始流:key:ng28值:{“eventid”:“2”,“groupid”:“ng28”,“eventtime”:“2017-05-31t14:36:56.653z”}
[kstream-aggregate-0000000003]:[ng28@1496241416653],([{“eventid”:“1”,“groupid”:“ng28”,“eventtime”:“2017-05-31t14:36:56.653z”},{“eventid”:“2”,“groupid”:“ng28”,“eventtime”:“2017-05-31t14:36:56.653z”}]<-null)
2017-06-01 09:57:27160信息流线程-1 c.s.c.c.s.csistreamfactory:196 - windowedtable:window:ng28start==>14962416653 end==>14962416653 windowedvalue:[{“eventid”:“1”,“groupid”:“ng28”,“eventtime”:“2017-05-31t14:36:56.653z”},{“eventid”:“2”,“groupid”:“ng28”,“eventtime”:“2017-05-31t14:36:56.653z”}]
事件3到达:
2017-06-01 09:57:31481信息流线程-1 c.s.c.c.s.eventtimeextractor:49 - =======读取groupid:ng28 eventid:3 eventtime:2017-05-31t14:36:56.653z[kstream-source-0000000000]:ng28,{“eventid”:“3”,“groupid”:“ng28”,“eventtime”:“2017-05-31t14:36:56.653z”}2017-06-01 09:57:31,482信息流线程-1 c.s.c.c.s.csistreamfactory:164 - -------原始流:key:ng28值:{“eventid”:“3”,“groupid”:“ng28”,“eventtime”:“2017-05-31t14:36:56.653z”}
[kstream-aggregate-0000000003]:[ng28@1496241416653],([{“eventid”:“1”,“clienteventid”:“123”,“groupid”:“ng28”,“eventtime”:“2017-05-31t14:36:56.653z”},{“eventid”:“2”,“groupid”:“ng28”,“eventtime”:“2017-05-31t14:36:56.653z”},{“eventid”:“3”,“groupid”:“ng28”,“eventtime”:“2017-05-31t14:36:56.653z”}]<-null)
2017-06-01 09:57:31484信息流线程-1 c.s.c.c.s.csistreamfactory:196 - windowedtable:window:ng28start==>14962416653 end==>14962416653 windowedvalue:[{“eventid”:“1”,“clienteventid”:“123”,“groupid”:“ng28”,“eventtime”:“2017-05-31t14:36:56.653z”},{“eventid”:“2”,“groupid”:“ng28”,“eventtime”:“2017-05-31t14:36:56.653z”},{“eventid”:“3”,“groupid”:“ng28”,“eventtime”:“2017-05-31t14:36:56.653z”}]
分组失败事件日志:groupid:ng30 groupid:ng30;各部位活动时间与ng28组相似。
事件1到达:
2017-06-01 10:00:03004信息流线程-1 c.s.c.c.s.eventtimeextractor:49 - =======读取组ID:ng30事件ID:1事件时间:2017-05-31t14:36:56.653z
[kstream-source-0000000000]:ng30,{“eventid”:“1”,“groupid”:“ng30”,“eventtime”:“2017-05-31t14:36:56.653z”}
[kstream-aggregate-0000000003]:[ng30@1496241416653],([{“eventid”:“1”,“groupid”:“ng30”,“eventtime”:“2017-05-31t14:36:56.653z”,“classname”:“com.syniverse.cds.domain.event”}]<-null)
2017-06-01 10:00:03007信息流线程-1 c.s.c.c.s.csistreamfactory:196 - windowedtable:window:ng30start==>14962416653 end==>14962416653 windowedvalue:[{“eventid”:“1”,“groupid”:“ng30”,“eventtime”:“2017-05-31t14:36:56.653z”}]
事件2到达:
2017-06-01 10:00:09225信息流线程-1 c.s.c.c.s.eventtimeextractor:49 - =======读取组ID:ng30事件ID:2事件时间:2017-05-31t14:36:56.653z
[kstream-source-0000000000]:ng30,{“eventid”:“2”,“groupid”:“ng30”,“eventtime”:“2017-05-31t14:36:56.653z”}
[kstream-aggregate-0000000003]:[ng30@1496241416653],([{“eventid”:“2”,“groupid”:“ng30”,“eventtime”:“2017-05-31t14:36:56.653z”}]<-null)
2017-06-01 10:00:09227信息流线程-1 c.s.c.c.s.csistreamfactory:196 - windowedtable:window:ng30start==>14962416653 end==>14962416653 windowedvalue:[{“eventid”:“2”,“groupid”:“ng30”,“eventtime”:“2017-05-31t14:36:56.653z”}]
事件3到达:
2017-06-01 10:00:14546信息流线程-1 c.s.c.c.s.eventtimeextractor:49 - =======读取组ID:ng30事件ID:3事件时间:2017-05-31t14:36:56.653z
[kstream-source-0000000000]:ng30,{“eventid”:“3”,“clienteventid”:“123”,“groupid”:“ng30”,“eventtime”:“2017-05-31t14:36:56.653z”}
[kstream-aggregate-0000000003]:[ng30@1496241416653],([{“eventid”:“3”,“groupid”:“ng30”,“eventtime”:“2017-05-31t14:36:56.653z”}]<-null)
2017-06-01 10:00:14547信息流线程-1 c.s.c.c.s.csistreamfactory:196 - windowedtable:window:ng30start==>14962416653 end==>14962416653 windowedvalue:[{“eventid”:“3”,“groupid”:“ng30”,“eventtime”:“2017-05-31t14:36:56.653z”}]
在第2个和第3个事件到达后,聚合步骤中不存在以前的事件。
我们尝试了不同的组合
1cache.max.bytes.buffering:设置为0和10kb
2commit.interval.ms:默认值,取值范围为5到2秒。
这是否与配置不正确或丢失有关?谢谢你的帮助。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题