阿克卡:坚持Cassandra和出版Kafka多个事件

nszi6y05  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(318)

我需要存储到Cassandra和出版Kafka多个事件,并呼吁一些最终决定 handler() 仅在存储和发布所有事件之后。
我遇到了只在所有事件都被持久化之后才更新actor state的方法,但它不包括事件也应该发布到kafka的情况。
kafka发布者和基本聚合根参与者处理多个事件,然后调用 handler() (通常用于返回参与者的响应):

abstract class AggregateRootActor () extends ActorPersistence {

    def processEvents(events: Seq[Event])(handler: Event => Unit): Unit = {
     persistAll(events) { persistedEvent =>
       state = //updateActorState

       //publish messages to kafka
       val futureResult = publisher.publishToKafka(event)

       // where to switch context to handle `EventProcessingCompleted` after all events are 
       // published?
       context.become {
          case EventProcessingCompleted => handler(persistedEvent)
          case ... // 
       }
      }
     self ! EventProcessingCompleted
    }

 }

欢迎提出任何解决方案!

ldxq2e6h

ldxq2e6h1#

我会这样构造它,假设你不想让演员回复,直到事件持续到Cassandra(为了将来的补水)和Kafka(大概是为了广播到其他系统)

// includes the event and anything else you'd want the handler to have,
//  e.g. where to send replies
case class EventProcessingCompleted(...)

persistAll(events) { persistedEvent =>
  state = ???

  // Other state changes (e.g. becomes) here

  publisher.publishToKafka(event).map(_ => EventProcessingCompleted(event)).pipeTo(self)
}

另一种可能更诚实地对待一致性权衡的方法是,让参与者设置一个从akka持久性查询到kafka生产者的流,按照以下思路进行kafka制作:

val readJournal = PersistenceQuery(actorSystem).readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)

// Spin this up after recovery has completed
val kafkaProductionStream =
  readJournal.eventsByPersistenceId(actorId, state.lastIdToKafka, Long.MaxValue)
    .mapAsync(1) { eventEnvelope =>
      publisher.publishToKafka(eventEnvelope._4.asInstanceOf[???]).map(_ => eventEnvelope._3)
    }
    .mapAsync(1) { sequenceNr => 
      self ? RecordKafkaProductionFor(sequenceNr)
    }

// run the stream etc.

// persist the highwater mark for sequence numbers produced to Kafka and update state

// can now consider persistence to Cassandra to imply production to Kafka, so 
//  can reply after persist to Cassandra

为了加强对kafka的生产保证,让应用程序的组件(可以是集群单例或分片)跟踪何时加载了持久性id,并加载最近最少使用的持久性id,以确保查询流运行,这可能很有用。

相关问题