akka模式匹配

mcdcgff0  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(374)

我有一个actor示例,定义如下:

class KafkaPublisher[T <: KafkaMessage] extends Actor {

  override final def receive = {
    case ProducerStreamActivated(_, stream: SourceQueueWithComplete[T]) =>
      context.become(active(stream))

    case other => println("KafkaPublisher got some unknown message while producing: " + other)
  }

  def active(stream: SourceQueueWithComplete[T]): Receive = super.receive orElse {
    case msg: T =>
      stream.offer(msg)

    case other => println("KafkaPublisher got the unknown message while producing: " + other)
  }
}
object KafkaPublisher {

  def props[T <: KafkaMessage](implicit tag: ClassTag[T]) = Props(new KafkaPublisher[T])
}

其中kafkamessage是一个标记特征,我想发布给kafka的每条消息都会扩展这个标记特征。我有一个监督演员,实际上将创建这个kafkapublisher演员的示例,这取决于kafka消息的类型。例如。,

case class MeterData(id: String, meterReadings: Map[DateTime, String]) extends KafkaMessage

因此,在我的supervisoractor中,我创建了一个publisheractor示例,如下所示:

def actorFor(props: Props, actorName: String) = context.actorOf(props, actorName)
val actorRef = actorFor(KafkaPublisher.props[MeterData], "meter-data-actor")

类似地,我为给定的kafkamessage类型创建相应的akka流。
所以我的问题是,如果我定义了一个通用的模式匹配,那么这个kafkapublisher-actor模式是否匹配meterdata。我的props方法中的classtag是否足够让我通过类型擦除?建议?

ztyzrc3y

ztyzrc3y1#

泛型类型上的模式匹配 T 当有一个隐含的 ClassTag[T] 在范围内。所以它应该足够提供和存储一个 ClassTag 在你的 KafkaPublisher :

import scala.reflect.ClassTag

class KafkaPublisher[T <: KafkaMessage : ClassTag] {
  /* ... */
}

object KafkaPublisher {
  def props[T <: KafkaMessage : ClassTag] = Props(new KafkaPublisher[T])
}

如果类型 T 底部是混凝土 props 调用站点,或者它是通用的,但是有一个隐式的 ClassTag[T] 好了,那你就给我打电话 props 和往常一样: actorFor(KafkaPublisher.props[MeterData], "meter-data-actor")

相关问题