我有这样一个例子:用户收集订单作为订单行。我用kafka主题实现了这一点,kafka主题包含有顺序更改的事件,它们被合并,存储在本地键值存储中,并作为第二个主题以顺序版本进行广播。
我需要以某种方式对废弃的订单做出React-这些订单已经启动,但至少在过去x小时内没有变化。
简单的解决方案是每y分钟扫描一次本地存储,然后将订单状态更改为“已放弃”的事件后处理。似乎我不能访问存储不是从处理器。。。但它也不是很优雅的编码。欢迎提出任何建议。
--编辑
我不能只将puctuation添加到merge/validation transformer,因为它的输出不同,应该路由到其他地方,比如在这个图像上(单个kafka streams应用程序):
所以“废弃订单处理器/转换器”对于它的输入来说是不可操作的(这里唯一的触发器是时间)。另一件事是,在这种情况下(如在图像上),我的转换器在初始化时得到forwardingdisabledprocessorcontext,所以我不能以标点符号发出任何消息。我可以传递kafkatemplatebean并生成新消息,但整个处理器/转换器只是空壳,只能访问本地存储。。。
这是我使用的代码片段:
public class AbandonedOrdersTransformer implements ValueTransformer<OrderEvent, OrderEvent> {
@Override
public void init(ProcessorContext processorContext) {
this.context = processorContext;
stateStore = (KeyValueStore)processorContext.getStateStore(KafkaConfig.OPENED_ORDERS_STORE);
//main scheduler
this.context.schedule(TimeUnit.MINUTES.toMillis(5), PunctuationType.WALL_CLOCK_TIME, (timestamp) -> {
KeyValueIterator<String, Order> iter = this.stateStore.all();
while (iter.hasNext()) {
KeyValue<String, Order> entry = iter.next();
if(OrderStatuses.NEW.equals(entry.value.getStatus()) &&
(timestamp - entry.value.getLastChanged().getTime()) > TimeUnit.HOURS.toMillis(4)) {
//SEND ABANDON EVENT "event"
context.forward(entry.key, event);
}
}
iter.close();
context.commit();
});
}
@Override
public OrderEvent transform(OrderEvent orderEvent) {
//do nothing
return null;
}
@Override
public void close() {
//do nothing
}
}
暂无答案!
目前还没有任何答案,快来回答吧!