scala—如何使用akka流在React式kafka中“组块并重新组合”大型消息

iecba09b  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(385)

当使用kafka发送一个大文件时,是否可以跨分区分发它,然后使用akka流重新组装它?如本演示所述:
http://www.slideshare.net/jiangjieqin/handle-large-messages-in-apache-kafka-58692297

afdcj2ne

afdcj2ne1#

“分块”方面,即制作人,很容易使用类似于React式Kafka的东西来写作:

case class LargeMessage(bytes : Seq[Byte], topic : String)

def messageToKafka(message : LargeMessage, maxMessageSize : Int) = 
  Source.fromIterator(() => message.bytes.toIterator)
        .via(Flow[Byte].grouped(maxMessageSize))
        .via(Flow[Seq[Byte]].map(seq => new ProducerRecord(message.topic, seq)))
        .runWith(Producer.plainSink(producerSettings)

“重新组装”,即消费者,可以采用与文档类似的方式实现:

val messageFut : Future[LargeMessage] = 
     for {
       bytes <- Consumer.map(_._1).runWith(Sink.seq[Byte])
     } yield LargeMessage(bytes, topic)

相关问题