KafkaConsumer.PlainSource( consumerSettings, subscription) .RunForeach(result => { _ActorRef.Tell(result.Message.Value); }, materializer);
wvmv3b1j1#
我运行的是SimpleProducer和SimpleConsumer样本,它们被烘焙到Akka.Streams.Kafka repository中,而PlainSource的设计与您的设计几乎完全相同:
SimpleProducer
SimpleConsumer
PlainSource
KafkaConsumer.PlainSource(consumerSettings, subscription) .RunForeach(result => { Console.WriteLine($"Consumer: {result.Topic}/{result.Partition} {result.Offset}: {result.Message.Value}"); }, materializer);
下面是我的CPU利用率的情况-请记住,生产者不断地为我的消费者生产新事件:
这是非常低的资源消耗--这正是Akka.Streams及其所有插件(如Kafka)提供的开箱即用的功能。您的设置没有背压支持(因为IActorRef.Tell是非阻塞的),因此这个流将在您的系统中 * 全速 * 运行。无论您的参与者正在做什么,都可能是导致高CPU利用率的原因。你的other ticket is asking about how to add backpressure support to your Akka.Streams.Kafka application,所以我也会帮你回答这个问题。
IActorRef.Tell
1条答案
按热度按时间wvmv3b1j1#
我运行的是
SimpleProducer
和SimpleConsumer
样本,它们被烘焙到Akka.Streams.Kafka repository中,而PlainSource
的设计与您的设计几乎完全相同:下面是我的CPU利用率的情况-请记住,生产者不断地为我的消费者生产新事件:
这是非常低的资源消耗--这正是Akka.Streams及其所有插件(如Kafka)提供的开箱即用的功能。
您的设置没有背压支持(因为
IActorRef.Tell
是非阻塞的),因此这个流将在您的系统中 * 全速 * 运行。无论您的参与者正在做什么,都可能是导致高CPU利用率的原因。你的other ticket is asking about how to add backpressure support to your Akka.Streams.Kafka application,所以我也会帮你回答这个问题。