当akka流中的JsonFraming出现EOF时,添加自定义逻辑/回调/处理程序

ui7jx7zq  于 2022-11-06  发布在  其他
关注(0)|答案(1)|浏览(201)

我有一个流程,在这个流程中,我以小批量的方式从Kafka pah主题中使用文件的路径,读取文件本身(大JSON数组),然后将它们写回到Kafka数据主题中。
它看起来像这样:

val fileFlow = Flow[Path].flatMapConcat(HdfsSource.data(fs, _))
        .via(JsonFraming.objectScanner(Int.MaxValue))

      Consumer
        .committableSource(newConsumerSettings, Subscriptions.topics(inputTopicNames))
        .map(value => value)
        .grouped(kafkaConsumerBatch)
        .flatMapConcat(paths => Source(paths))
        .map(path => new Path(path.record.value().get("fullPath").asInstanceOf[String]))
        //Based on: https://github.com/akka/alpakka/blob/v3.0.0/doc-examples/src/test/scala/akka/stream/alpakka/eip/scaladsl/PassThroughExamples.scala#L72-L92
        .via(PassThroughFlow(fileFlow))
        .map { case (bytes, path) => (bytes, entityConfigMap(getCountryPrefix(path))) }
        .map(bytesAndPath => (bytesAndPath._1.utf8String.parseJson.asJsObject, bytesAndPath._2))
        .map { case (bytes, entityConfig) => (toGenericRecord(bytes, entityConfig), entityConfig) }
        .map { case (record, entityConfig) =>
          producerMessagesToTopic.mark()
          ProducerMessage.single(
            new ProducerRecord[NotUsed, GenericRecord](getDataTopicName(entityConfig), record),
            passThrough = entityConfig)
        }
        .via {
          akka.kafka.scaladsl.Producer.flexiFlow(prodSettings)
        }
....More logic for logging and running/materializing the flow

现在,问题是,正如我所说的,这些JSON文件很大,所以我不能将整个文件内容放入单独的对象中,将它们全部存储到Kafka中,然后才提交。我的意思是,这是我需要做的,但我还需要根据EOF事件控制偏移提交。
我想让Producer以自己的速度向Kafka发送数据,而不管它的配置如何,但以某种方式将我的自定义逻辑注入到EOF事件中。也许像passThrough字段这样的东西表示文件已经被完全使用,我们现在可以提交上游路径主题的偏移量。
objectScanner在其定义中有一个GraphStageLogic,它有onUpstreamFinish回调,但是没有直接访问它来重写。像SimpleLinearGraphStageJsonObjectParser这样的类被标记为内部API。

t0ybt7op

t0ybt7op1#

我被
......我无法将整个文件内容放入单独的对象中,将它们全部存储到Kafka中,然后才提交
因为看起来(如果我理解错了,你可以评论一下),偏移量提交实际上是一种确认,表明你已经完全处理了一个文件,所以没有办法不提交偏移量,直到文件中位于该偏移量的消息中的所有对象都被生成给Kafka。
Source.via(Flow.flatMapConcat.via(...)).map.via(...)的缺点是它是一个单一的流,并且第一个和第二个via之间的所有操作(包括第一个和第二个)都需要一段时间。
如果您可以在输出主题中交叉使用来自文件的对象,并且可以不可避免地将来自给定文件的对象两次生成到输出主题(这两种情况可能会也可能不会对该主题的下游消费者的实现施加有意义的约束/困难),那么您可以并行化文件的处理。mapAsync流阶段在这方面特别有用:

import akka.Done

// assuming there's an implicit Materializer/ActorSystem (depending on the version of Akka Streams you're running) in scope
def process(path: Path): Future[Done] =
  Source.single(path)
    .via(PassThroughFlow(fileFlow))
    .map { case (bytes, path) => (bytes, entityConfigMap(getCountryPrefix(path))) }
    .map(bytesAndPath => (bytesAndPath._1.utf8String.parseJson.asJsObject, bytesAndPath._2))
    .map { case (bytes, entityConfig) => (toGenericRecord(bytes, entityConfig), entityConfig) }
    .map { case (record, entityConfig) =>
      producerMessagesToTopic.mark()
      ProducerMessage.single(
        new ProducerRecord[NotUsed, GenericRecord](getDataTopicName(entityConfig), record),
        passThrough = entityConfig)
    }
    .via {
      akka.kafka.scaladsl.Producer.flexiFlow(prodSettings)
    }
    .runWith(Sink.ignore)

 // then starting right after .flatMapConcat(paths => Source(paths))
 .mapAsync(parallelism) { committableMsg =>
   val p = new Path(committableMsg.record.value().get("fullPath").asInstanceOf[String])
   process(p).map { _ => committableMsg.committableOffset }
 }
 // now have the committable offsets

parallelism会限制你在给定时间处理的路径数,并保持提交程序的顺序(即在所有消息都被完全处理之前,偏移量不会到达提交程序)。

相关问题