我正在尝试从单个输入流创建多个输出流(取决于不同的时间窗口)。
interface AnalyticsBinding {
String PAGE_VIEWS_IN = "pvin";
String PAGE_VIEWS _COUNTS_OUT_Last_5_Minutes = "pvcout_last_5_minutes";
String PAGE_VIEWS _COUNTS_OUT_Last_30_Minutes = "pvcout_last_30_minutes";
@Input(PAGE_VIEWS_IN)
KStream<String, PageViewEvent> pageViewsIn();
@Output(PAGE_VIEWS_COUNTS_OUT_Last_5_Minutes)
KStream<String,Long> pageViewsCountOutLast5Minutes();
@Output(PAGE_VIEWS_COUNTS_OUT_Last_30_Minutes)
KStream<String,Long> pageViewsCountOutLast30Minutes();
}
@StreamListener
@SendTo({ AnalyticsBinding.PAGE_VIEWS_COUNTS_OUT_Last_5_Minutes })
public KStream<String, Long> processPageViewEventForLast5Mintues(
@Input(AnalyticsBinding.PAGE_VIEWS_IN)KStream<String, PageViewEvent> stream) {
// aggregate by Duration.ofMinutes(5)
}
@StreamListener
@SendTo({ AnalyticsBinding.PAGE_VIEWS_COUNTS_OUT_Last_30_Minutes })
public KStream<String, Long> processPageViewEventForLast30Mintues(
@Input(AnalyticsBinding.PAGE_VIEWS_IN)KStream<String, PageViewEvent> stream) {
// aggregate by Duration.ofMinutes(30)
}
当我启动应用程序时,只有一个流任务可以工作,有没有办法让ProcessPageViewEventforLast5Minutes和ProcessPageViewEventforLast30minutes同时工作
1条答案
按热度按时间zynd9foi1#
您在两个处理器中使用相同的输入绑定,这就是为什么您只看到一个在工作。在绑定接口中添加另一个输入绑定,并将其目标设置为同一主题。另外,换一个
StreamListener
方法来使用此新绑定名称。也就是说,如果您使用的是最新版本的springcloudstream,那么您应该考虑迁移到功能模型。例如,以下操作应起作用。
和
在这种情况下,绑定器会自动创建两个不同的输入绑定。您可以在这些绑定上设置目的地。