具有Akka类型执行元和群集分片的优先级邮箱

bq9c1y66  于 2022-11-06  发布在  其他
关注(0)|答案(1)|浏览(134)

我有一个集群分片应用程序,其中有输入的参与者。参与者如下所示:

object TestActor {

  sealed trait Command
  final case class Inte(i: Int) extends Command
  final case class Stringaki(s: String) extends Command

  val TypeKey = EntityTypeKey[Command]("Test")

  def defaultThreadBehavior(id: String): Behavior[Command] = Behaviors.setup { ctx =>

    Behaviors.receiveMessage { cmd =>
      cmd match {
        case Inte(i) =>
          ctx.log.info(System.currentTimeMillis()/1000 + " Received int: " + i)
          Thread.sleep(1000)
        case Stringaki(s) =>
          ctx.log.info(System.currentTimeMillis()/1000 + " Received string: " + s)
          Thread.sleep(1000)
      }
      Behaviors.same
    }
  }

}

演员是通过“碎片封套”创建的,如下所示:

val system_config = ConfigFactory.parseString(
      """
        |akka {
        |  actor {
        |    provider = "cluster"
        |    prio-dispatcher {
        |      type = "Dispatcher"
        |      mailbox-type = "PriorityMailbox"
        |    }
        |  }
        |  remote {
        |    netty.tcp {
        |      hostname = "127.0.0.1"
        |      port = 2551
        |    }
        |  }
        |  cluster {
        |    seed-nodes = [
        |      "akka.tcp://TestApp@127.0.0.1:2551"
        |    ]
        |    sharding {
        |      number-of-shards = 10
        |      use-dispatcher = "akka.actor.prio-dispatcher"
        |    }
        |  }
        |}
        |""".stripMargin)

    val system = ActorSystem(Behaviors.empty[TestActor.Command], "TestApp",system_config)
    val sharding = ClusterSharding(system)

    val shardRegion = sharding.init(Entity(TestActor.TypeKey, ctx => defaultThreadBehavior(ctx.entityId)))

    (0 to 9).foreach{
      i =>
        shardRegion ! ShardingEnvelope(0.toString, Inte(i))
    }

    (0 to 9).foreach{
      i =>
        shardRegion ! ShardingEnvelope(0.toString, Stringaki(i.toString))
    }

两个for循环将消息发送到同一个参与者。第一个循环发送整数,第二个循环发送字符串。参与者处理消息时,它将休眠,以便在队列中建立消息并测试优先级。优先级邮箱在系统配置中配置,UnboundedPriorityMailbox实现如下:

class PriorityMailbox (settings: Settings, cfg: Config) extends UnboundedPriorityMailbox(

  PriorityGenerator {
    case Stringaki => 0
    case _ => 1
  }

)

为什么执行元按消息到达的顺序打印消息,而不考虑优先级生成器?

2vuwiymt

2vuwiymt1#

为什么你没有看到优先邮箱的效果,简单的答案是你的TestActor没有使用优先邮箱,而是使用默认邮箱。只有Akka Cluster分片系统使用优先邮箱。Cluster分片参考.conf akka.cluster.sharding.use-dispatcher的描述:


# The id of the dispatcher to use for ClusterSharding actors.

# If specified you need to define the settings of the actual dispatcher.

# This dispatcher for the entity actors is defined by the user provided

# Props, i.e. this dispatcher is not used for the entity actors.

确实,您发送的每一条消息都要经过优先级邮箱,但由于集群分片的内部参与者没有睡觉,因此不会产生积压(尽管在某些情况下,尤其是在内核较少的情况下,可能会有积压,而优先级可能会有所提高)。
要让实体参与者在具有优先级邮箱的调度程序中运行,您需要如下所示

val entityDispatcherProps = DispatcherSelector.fromConfig("akka.actor.prio-dispatcher")
val baseEntity = Entity(TestActor.TypeKey)(ctx => defaultThreadBehavior(ctx.entityId))
val shardRegion = sharding.init(baseEntity.withEntityProps(entityDispatcherProps))

相关问题