akka stream onnext

mxg2im7a  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(343)

我只是遵循了akka stream actorpublisher的示例,有时我得到了以下消息:
java.lang.illegalstateexception:流未请求元素时不允许onnext,totaldemand为0
看着这些文件,他们解释说:
通过调用onnext将元素发送到流。您可以发送流订户请求的任意多个元素。这个数额可以用总需求来查询。只有当isactive和totaldemand>0时才允许使用onnext,否则onnext将抛出illegalstateexception。
当流订阅服务器请求更多元素时,actorpublishermessage.request消息将传递到此参与者,您可以对该事件执行操作。总需求将自动更新。
如何防止总需求为零?当我收到这个错误时,我丢失了我试图发送的信息。
以下是我一直遵循的示例:
http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-rc3/scala/stream-integrations.html
这是我的班级考试

object Test extends App {

  implicit val actorSystem = ActorSystem("ReactiveKafka")
  implicit val materializer = ActorFlowMaterializer()

  val kafka = new ReactiveKafka(host = "localhost:9092", zooKeeperHost = "localhost:2181")
  val publisher = kafka.consume("test", "groupName", new StringDecoder())

  val workerActor = actorSystem.actorOf(Props[Worker], name = "workerActor")

  Source(publisher).map(WorkerPool.Msg(_, workerActor)).runWith(Sink.actorSubscriber(WorkerPool.props))

}

好吧,我收到了Kafka的消息,我正在传递给workeractor,但是当每秒向Kafka发送大约10条消息时,由于这个错误,有些消息丢失了。
更新
我遇到了这里描述的错误(使用相同的库):
https://github.com/softwaremill/reactive-kafka/issues/11
我用一个缓冲区解决了我的问题,但是看起来这个pr可以解决这个问题。
https://github.com/softwaremill/reactive-kafka/pull/13

sgtfey8w

sgtfey8w1#

如果下游Flume没有任何需求,那么你唯一的选择就是
告诉数据源 Worker 没有需求,因此源可以停止生成消息,直到有更多的需求进来(React式解决方案)。
缓冲消息,直到你从接收器得到一些需求,这可能会填满你的缓冲区,你无论如何都会丢弃消息。
当需求为0时删除消息(这似乎是您当前的实现)。
但“背压”的全部目的是防止在没有需求时调用onnext。
要实现上面的缓冲选项,您可以在actor内部或外部缓冲:
内部缓冲区:查看文档中的“actorpublisher”示例,以获取为actorpublisher提供缓冲的actor示例。
外部缓冲区:使用缓冲物化器或 Flow.buffer 在你的小溪里。

相关问题