使用alpakka将1个输入连接到n个输出

voase2hg  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(347)

我正在尝试将生产者与消费者连接起来的一些变体,在这种特殊情况下,有时我需要为每条消息生成一条额外的消息(例如,一条消息指向输出主题,另一条消息指向另一个主题),同时保证这一点。
我在考虑执行mapconcat并输出多个producerrecord对象,我担心在边缘情况下,第一条消息足以使提交在该偏移量上发生,从而导致第二条消息的潜在丢失。此外,似乎你不能只做.flatmap,因为你将进入graph api,这会变得更加混乱,因为这样就很难确保一旦你合并到一个提交流,你不只是忽略重复的偏移量。

Consumer.committableSource(consumerSettings, Subscriptions.topics(inputTopic))
  .map(msg => (msg, addLineage(msg.record.value())))
  .mapConcat(input => 
    if (math.random > 0.25) 
      List(ProducerMessage.Message(
        new ProducerRecord[Array[Byte], Array[Byte]](outputTopic, input._1.record.key(), input._2),
        input._1.committableOffset
      ))
    else List(ProducerMessage.Message(
      new ProducerRecord[Array[Byte], Array[Byte]](outputTopic, input._1.record.key(), input._2),
      input._1.committableOffset
    ),ProducerMessage.Message(
      new ProducerRecord[Array[Byte], Array[Byte]](outputTopic2, input._1.record.key(), input._2),
      input._1.committableOffset
    ))
  )
  .via(Producer.flow(producerSettings))
  .map(_.message.passThrough)
  .batch(max = 20, first => CommittableOffsetBatch.empty.updated(first)) {
    (batch, elem) => batch.updated(elem)
  }
  .mapAsync(parallelism = 3)(_.commitScaladsl())
  .runWith(Sink.ignore)

原始1比1文档如下:https://doc.akka.io/docs/akka-stream-kafka/current/consumer.html#connecting-生产者和消费者
有人想到/解决了这个问题吗?

zu0ti5jz

zu0ti5jz1#

alpakka-kafka连接器最近推出了 flexiFlow 它支持您的用例:让一个stream元素向kafka生成多条消息

相关问题