嗨,我现在的情况是我有一个
final case class Transaction(
token: String,
processingTime: Long
)
def rawSource: SourceWithContext[Either[NullJsonError, Json], CommittableOffset, Control] = Consumer
.committableSource[String, Either[NullJsonError, Json]](
consumerSettings(kafkaConfigs),
Subscriptions.topics(kafkaConfigs.topic)
)
.asSourceWithContext(_.committableOffset)
.map(_.record.value())
.map {
case Right(json) if json == Json.Null =>
Left(NullJsonError)
case other => other
}
private val runnableGraph = rawSource
.map(standardizerStage.stageMap) // This will convert Either[NullJsonError, Json] to Either[NullJsonError, Transaction]
.collect {
case Right(Some(value)) => {
Right(value)
}
case Left(error) => Left(error)
}
.mapAsync(10)(saveToDB) // This will process the Transaction i.e. save to DB
.asSource
.map { case (_, committableOffset) => committableOffset }
.toMat(Committer.sink(CommitterSettings(system)))(Keep.both)
.mapMaterializedValue(Consumer.DrainingControl.apply)
现在我想把Kafka传来的信息按 Transaction.token
以5秒为间隔或5000条消息(以先到者为准),然后从每组中仅选取一个具有最高优先级的事务 Transaction.processingTime
把它传给db。
现在我知道我应该用
.groupedWithin(5000, 5.seconds)
.mapConcat { group =>
val commitableOffsetBatch = CommittableOffsetBatch.apply(group.map(_.committableOffset))
commitableOffsetBatch.commitScaladsl()
group
}
但我不知道怎么用它来分组 token
并过滤 processingTime
之后每组 .map(standardizerStage.stageMap)
因为只有在 .map(standardizerStage.stageMap)
将有一个事务对象。
暂无答案!
目前还没有任何答案,快来回答吧!