我是分组输入数据的基础上的关键,然后做1分钟窗口30秒跳和在聚合器。
数据正在被一个应用程序发送和使用,对这个应用程序的需求可能会在未来发展,因此,我认为未来需要灵活性和快速变化。
当前逻辑描述如下:
@StreamListener("input")
public void process(KStream<String, Data> DataKStream) {
JsonSerde<DataAggregator> DataJsonSerde =
new JsonSerde<>(DataAggregator.class);
DataKStream
.groupByKey()
.windowedBy(TimeWindows.of(60000).advanceBy(30000))
.aggregate(
DataAggregator::new,
(key, Data, aggregator) -> aggregator.add(Data),
Materialized.with(Serdes.String(), DataJsonSerde)
);
}
DataAggregator.java
public class DataAggregator {
private List<String> dataList = new ArrayList<>();
public DataAggregator add(Data data) {
dataList.add(data.getId());
System.out.println(dataList);
return this;
}
public List<String> getDataList() {
return dataList;
}
}
然而,鉴于不断发展的需求,我想给用户的可能性,改变通过菜单的逻辑。
例如,用户可以随意更改窗口或更改数据隔离的方式。
我可能在考虑编写几个java类,当用户选择特定选项时,这些类可以打开和关闭。
但我想知道是否可以做些更好、更有活力的事情。
1条答案
按热度按时间fquxozlt1#
使用flink,有些事情在作业运行时无法更改--值得注意的是,作业图的拓扑结构和运算符的并行性。
另一方面,可以在整个集群中广播控制流,以对业务逻辑进行动态更改。在简单的情况下,这已被用来修改滤波器参数;例如,在更复杂的情况下,它被用来触发代码的动态加载或转换中使用的机器学习模型(例如,通过广播pmml)。
示例用例:rbea:king的可伸缩实时分析、流媒体模型、ing如何添加模型。
不太明显的是如何动态地重新配置聚合。开源欺诈检测演示(第1部分,第2部分,github)演示了如何实现这一点。
有关另一个示例,请参见cogynt:flink without code。