lagom服务使用来自kafka的输入

hkmswyz6  于 2021-06-06  发布在  Kafka
关注(0)|答案(2)|浏览(427)

我试图弄清楚如何使用lagom来使用通过kafka进行通信的外部系统的数据。
我浏览了lagom文档的这一部分,它描述了lagom服务如何通过订阅其主题与另一个lagom服务通信。

helloService
  .greetingsTopic()
  .subscribe // <-- you get back a Subscriber instance
  .atLeastOnce(
  Flow.fromFunction(doSomethingWithTheMessage)
)

但是,当您想要订阅一个包含由某个随机的外部系统生成的事件的Kafka主题时,适当的配置是什么?
这个功能需要某种适配器吗?为了澄清这一点,我现在有:

object Aggregator {
  val TOPIC_NAME = "my-aggregation"
}

trait Aggregator extends Service {
  def aggregate(correlationId: String): ServiceCall[Data, Done]

  def aggregationTopic(): Topic[DataRecorded]

  override final def descriptor: Descriptor = {
    import Service._

    named("aggregator")
      .withCalls(
        pathCall("/api/aggregate/:correlationId", aggregate _)
      )
      .withTopics(
        topic(Aggregator.TOPIC_NAME, aggregationTopic())
          .addProperty(
            KafkaProperties.partitionKeyStrategy,
            PartitionKeyStrategy[DataRecorded](_.sessionData.correlationId)
          )
      )
      .withAutoAcl(true)
  }
}

我可以通过简单的post请求调用它。但是,我希望它能被 Data 一些(外部)Kafka主题的信息。
我想知道是否有这样一种方法可以以类似于这个模型的方式配置描述符:

override final def descriptor: Descriptor = {
  ...
  kafkaTopic("my-input-topic")
    .subscribe(serviceCall(aggregate _)
    .withAtMostOnceDelivery
}

我在googlegroups上遇到过这样的讨论,但是在ops的问题中,我看不出他实际上在做什么 EventMessage 他从哪里来 some-topic 除了将它们发送到服务定义的主题。

编辑#1:进度更新

查看文档,我决定尝试以下方法。我又增加了两个模块, aggregator-kafka-proxy-api 以及 aggregator-kafka-proxy-impl .
在新的api模块中,我定义了一个新的服务,没有方法,只有一个主题可以代表我的kafka主题:

object DataKafkaPublisher {
  val TOPIC_NAME = "data-in"
}

trait DataKafkaPublisher extends Service {
  def dataInTopic: Topic[DataPublished]

  override final def descriptor: Descriptor = {
    import Service._
    import DataKafkaPublisher._

    named("data-kafka-in")
      .withTopics(
        topic(TOPIC_NAME, dataInTopic)
          .addProperty(
            KafkaProperties.partitionKeyStrategy,
            PartitionKeyStrategy[SessionDataPublished](_.data.correlationId)
          )
      )
      .withAutoAcl(true)
  }
}

在impl模块中,我只做了标准实现

class DataKafkaPublisherImpl(persistentEntityRegistry: PersistentEntityRegistry) extends DataKafkaPublisher {
  override def dataInTopic: Topic[api.DataPublished] =
    TopicProducer.singleStreamWithOffset {
      fromOffset =>
        persistentEntityRegistry.eventStream(KafkaDataEvent.Tag, fromOffset)
          .map(ev => (convertEvent(ev), ev.offset))
    }

  private def convertEvent(evt: EventStreamElement[KafkaDataEvent]): api.DataPublished = {
    evt.event match {
      case DataPublished(data) => api.DataPublished(data)
    }
  }
}

现在,要真正消费这些事件,在我看来 aggregator-impl 模块中,我添加了一个“订户”服务,它接收这些事件,并在实体上调用适当的命令。

class DataKafkaSubscriber(persistentEntityRegistry: PersistentEntityRegistry, kafkaPublisher: DataKafkaPublisher) {

  kafkaPublisher.dataInTopic.subscribe.atLeastOnce(
    Flow[DataPublished].mapAsync(1) { sd =>
      sessionRef(sd.data.correlationId).ask(RecordData(sd.data))
    }
  )

  private def sessionRef(correlationId: String) =
    persistentEntityRegistry.refFor[Entity](correlationId)
}

这实际上允许我发布一条关于Kafka主题“数据输入”的消息,然后将其代理并转换为 RecordData 命令之前发出的实体消费。
不过,这对我来说似乎有点不妥。我和Kafka是由拉格姆的内在联系。我不能轻易地交换数据源。例如,如果我愿意,我将如何使用来自rabbitmq的外部消息?如果我试图从另一个Kafka(不同于拉格姆使用的Kafka)消费呢?

编辑#2:更多文档

我发现了一些关于lagom文档的文章,特别是:
第三方的消费主题
您可能希望lagom服务使用在lagom中未实现的服务上生成的数据。在这种情况下,如服务客户机部分所述,您可以在lagom项目中创建第三方服务api模块。该模块将包含一个服务描述符,声明您将从中使用的主题。一旦实现了第三方服务接口和相关类,就应该添加第三方服务api作为对您的高级服务impl的依赖。最后,您可以使用thirdpartyservice中描述的主题,如subscribe to a topic部分中所述。

dced5bon

dced5bon1#

我不使用 lagom 所以这也许只是个想法。但作为 akka-streams 是的一部分 lagom (至少我这么认为)-从这个解决方案中得到你需要的东西应该很容易。
我用了阿克卡流Kafka,这真的很好(我只做了一个原型)
当你消费信息时,你会做一些事情:

Consumer
      .committableSource(
          consumerSettings(..), // config of Kafka
          Subscriptions.topics("kafkaWsPathMsgTopic")) // Topic to subscribe
      .mapAsync(10) { msg =>
        business(msg.record) // do something
      }

检查书面文件
你在这里可以找到我的整个例子:pathmsgconsumer

5fjcxozz

5fjcxozz2#

艾伦·克里基奇在lightbend论坛上给出了答案。
第1部分:
如果您在业务服务中只使用外部kafka集群,那么您可以只使用lagom代理api来实现这一点。所以你需要:
使用仅包含主题定义的服务描述符创建api(未实现此api)
在您的业务服务中,根据您的部署配置kafka\u native(正如我在前一篇文章中提到的)
在您的业务服务中,从#1中创建的api注入服务,并使用lagom broker api subscriber订阅服务
补偿委托,在lagom代理api订户是开箱即用的。
第2部分:
kafka和amqp消费者实现需要持久的akka流。所以你需要处理断开连接。这可以通过两种方式实现:
通过在一个演员包裹它来控制peristant akka流。在actor预启动时初始化流,并将流完整地传递给将停止它的actor。如果流完成或失败,actor将停止。然后在actor backoff中使用restart策略 Package actor,在完成或失败时重新启动actor并重新初始化流
akka流延迟重启与退避阶段
我用的是#1,还没试过#2。
为#1初始化backoff actor或为#2初始化flow可以在您的lagom components trait中完成(基本上与您现在使用lagom broker api进行订阅的位置相同)。
在配置消费者时,请确保设置消费者组,以确保避免重复消费。您可以像lagom一样,使用描述符中的服务名作为使用者组名。

相关问题