我有一个应用程序,它接收一个API请求,并将其转发给一个Kafka API Producer。每个请求都调用这个Producer来向Kafka发送一条消息。这个Producer存在于应用程序的整个生命周期中,并为所有请求共享。producer.send(new ProducerRecord[String, String](topic, requestBody))
这样就可以了。现在我想使用alpakka Producer来完成这个任务。代码如下所示:
val kafkaProducer = producerSettings.createKafkaProducer()
val settingsWithProducer = producerSettings.withProducer(kafkaProducer)
val done = Source.single(requestBody)
.map(value => new ProducerRecord[String, String](topic, value))
.runWith(Producer.plainSink(settingsWithProducer))
与普通的生产者相比,alpakka Producer有什么优势?我不知道新方法是否能帮助我同时按顺序处理大量的API请求。
1条答案
按热度按时间gzjq41n41#
对于生成一个Kafka主题的消息的情况,您使用的Alpakka Producer接收器并没有真正提供什么好处(唯一的好处 * 可能 * 是如果您对使用Akka Discovery来发现您的Kafka代理感兴趣)。在这种情况下,Alpakka的
SendProducer
在Scala代码中可能会很有用:它公开了ScalaFuture
而不是JavaFuture
。Alpakka Producer在流上下文中发挥作用的地方是,在流上下文中,您希望使用反压力按顺序生成一系列元素,特别是当要生成的消息是复杂流拓扑的输出时。
我认为“大量的API请求”是指HTTP/gRPC请求进入您的服务,并且每个请求最多产生一条消息给Kafka。您可以将这样的请求扭曲成流(例如,通过
Source.actorRef
提供流),但这可能过于复杂。至于“在订货的同时”:这是一种矛盾,因为“in order”在某种程度上排除了同时性。您是否在考虑这样一种情况,即您可以对请求进行分区,然后您希望在请求的分区内进行排序,但是跨分区的任何排序都可以(请注意,我并不一定暗示您要对Kafka主题进行划分)?在这种情况下, akka 流(和可能的演员)将派上用场,生产者汇/流将可能派上用场。