当使用kafka发送一个大文件时,是否可以跨分区分发它,然后使用akka流重新组装它?如本演示所述:http://www.slideshare.net/jiangjieqin/handle-large-messages-in-apache-kafka-58692297
afdcj2ne1#
“分块”方面,即制作人,很容易使用类似于React式Kafka的东西来写作:
case class LargeMessage(bytes : Seq[Byte], topic : String) def messageToKafka(message : LargeMessage, maxMessageSize : Int) = Source.fromIterator(() => message.bytes.toIterator) .via(Flow[Byte].grouped(maxMessageSize)) .via(Flow[Seq[Byte]].map(seq => new ProducerRecord(message.topic, seq))) .runWith(Producer.plainSink(producerSettings)
“重新组装”,即消费者,可以采用与文档类似的方式实现:
val messageFut : Future[LargeMessage] = for { bytes <- Consumer.map(_._1).runWith(Sink.seq[Byte]) } yield LargeMessage(bytes, topic)
1条答案
按热度按时间afdcj2ne1#
“分块”方面,即制作人,很容易使用类似于React式Kafka的东西来写作:
“重新组装”,即消费者,可以采用与文档类似的方式实现: