我需要存储到Cassandra和出版Kafka多个事件,并呼吁一些最终决定 handler()
仅在存储和发布所有事件之后。
我遇到了只在所有事件都被持久化之后才更新actor state的方法,但它不包括事件也应该发布到kafka的情况。
kafka发布者和基本聚合根参与者处理多个事件,然后调用 handler()
(通常用于返回参与者的响应):
abstract class AggregateRootActor () extends ActorPersistence {
def processEvents(events: Seq[Event])(handler: Event => Unit): Unit = {
persistAll(events) { persistedEvent =>
state = //updateActorState
//publish messages to kafka
val futureResult = publisher.publishToKafka(event)
// where to switch context to handle `EventProcessingCompleted` after all events are
// published?
context.become {
case EventProcessingCompleted => handler(persistedEvent)
case ... //
}
}
self ! EventProcessingCompleted
}
}
欢迎提出任何解决方案!
1条答案
按热度按时间ldxq2e6h1#
我会这样构造它,假设你不想让演员回复,直到事件持续到Cassandra(为了将来的补水)和Kafka(大概是为了广播到其他系统)
另一种可能更诚实地对待一致性权衡的方法是,让参与者设置一个从akka持久性查询到kafka生产者的流,按照以下思路进行kafka制作:
为了加强对kafka的生产保证,让应用程序的组件(可以是集群单例或分片)跟踪何时加载了持久性id,并加载最近最少使用的持久性id,以确保查询流运行,这可能很有用。