我不确定这是否可行,但以下是我的设想:
我有一个长为key的ktable和一个arraylist作为value
当ktable中某个特定键的值(id的arraylist)改变时,我想为arraylist中的每个id发布一条主题消息。
假设我有记录 <1, [2,3]>
在ktable中。如果现在更改为 <1, [2,4]>
,我想在上发布消息 output_topic
- <2, Object>, <4,Object>
. 对象派生自ktable值中的其他属性。
为此,我一直在考虑通过提供一个定制将ktable转换为kstream KeyValueMapper
然后发布一个主题的流。我在网上看到一个问题 KeyValueMapper
一次只能转换一条消息,因此适合一对一Map。在我的场景中,我想将一条传入消息转换为多条传出消息。这是我的密码:
final KeyValueMapper<Long, CustomGroup, KeyValue<Long, CustomCluster>> outputMapper = new KeyValueMapper<Long, CustomGroup,KeyValue<Long, CustomCluster>>() {
public KeyValue<Long, CustomCluster> apply(Long key, CustomGroup value) {
ArrayList<KeyValue<Long, CustomCluster>> result = new ArrayList<>(value.compositeIds.size());
for(Long compositeId : value.compositeIds) {
CustomCluster c = new CustomCluster(value.appSettingID, value.appSettingName, compositeId, value.user);
c.clusters = new HashSet<Cluster>();
c.clusters.add(new Cluster(c.clusterId, c.clusterName));
result.add(new KeyValue<>(compositeId, c));
}
return result;
}
};
KStream<Long, CustomCluster> outputStream = cgTable.toStream(outputMapper);
结果是包含我所有消息的列表。这段代码给了我一个语法错误,因为它希望返回一个keyvalue对象,而不是keyvalue对象的arraylist。
有没有办法让outputmapper按照我设想的方式工作?有没有别的方法来完成这项任务。基本上,我需要发送多条消息,这些消息是从某个特定主题的ktable record值派生的。
当传入的更改在kstream而不是ktable中时,我能够完成任务。kstream提供了一个flatmap方法来实现这一点。
final KeyValueMapper<Long, CustomGroup, Iterable<KeyValue<Long, CustomCluster>>> outputMapper = new KeyValueMapper<Long, CustomGroup, Iterable<KeyValue<Long, CustomCluster>>>() {
public Iterable<KeyValue<Long, CustomCluster>> apply(Long key, CustomGroup value) {
ArrayList<KeyValue<Long, CustomCluster>> result = new ArrayList<>(value.compositeIds.size());
for(Long compositeId : value.compositeIds) {
CustomCluster c = new CustomCluster(value.appSettingID, value.appSettingName, compositeId, value.user);
c.clusters = new HashSet<Cluster>();
c.clusters.add(new Cluster(c.clusterId, c.clusterName));
result.add(new KeyValue<>(compositeId, c));
}
return result;
}
};
KStream<Long, CustomCluster> outputStream = cgStream.flatMap(outputMapper);
所以问题是ktable是否提供了与kstream的flatmap方法等价的方法。我希望我的问题不要太复杂。谢谢你的帮助
暂无答案!
目前还没有任何答案,快来回答吧!