akka流组和过滤器

vmpqdwk3  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(243)

嗨,我现在的情况是我有一个

final case class Transaction(
    token: String,
    processingTime: Long
)

def rawSource: SourceWithContext[Either[NullJsonError, Json], CommittableOffset, Control] = Consumer
      .committableSource[String, Either[NullJsonError, Json]](
        consumerSettings(kafkaConfigs),
        Subscriptions.topics(kafkaConfigs.topic)
      )
      .asSourceWithContext(_.committableOffset)
      .map(_.record.value())
      .map {
        case Right(json) if json == Json.Null =>
          Left(NullJsonError)
        case other => other
      }

private val runnableGraph = rawSource
    .map(standardizerStage.stageMap) // This will convert Either[NullJsonError, Json] to  Either[NullJsonError, Transaction] 
    .collect {
      case Right(Some(value)) => {
        Right(value)
      }
      case Left(error) => Left(error)
    }
    .mapAsync(10)(saveToDB) // This will process the Transaction i.e. save to DB
    .asSource
    .map { case (_, committableOffset) => committableOffset }
    .toMat(Committer.sink(CommitterSettings(system)))(Keep.both)
    .mapMaterializedValue(Consumer.DrainingControl.apply)

现在我想把Kafka传来的信息按 Transaction.token 以5秒为间隔或5000条消息(以先到者为准),然后从每组中仅选取一个具有最高优先级的事务 Transaction.processingTime 把它传给db。
现在我知道我应该用

.groupedWithin(5000, 5.seconds)
      .mapConcat { group =>
        val commitableOffsetBatch = CommittableOffsetBatch.apply(group.map(_.committableOffset))
        commitableOffsetBatch.commitScaladsl()
        group
      }

但我不知道怎么用它来分组 token 并过滤 processingTime 之后每组 .map(standardizerStage.stageMap) 因为只有在 .map(standardizerStage.stageMap) 将有一个事务对象。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题