我使用playframework(scala)作为微服务,并使用kafka作为事件总线。我有一个Map到如下事件类的事件使用者:
case class MovieEvent[T] (
mediaId: String,
config: T
)
object MovieEvent {
implicit def movieEventFormat[T: Format]: Format[MovieEvent[T]] =
((__ \ "mediaId").format[String] ~
(__ \ "config").format[T]
)(MovieEvent.apply _, unlift(MovieEvent.unapply))
}
object MovieProvider extends SerializableEnumeration {
implicit val providerReads: Reads[MovieProvider.Value] = SerializableEnumeration.jsonReader(MovieProvider)
implicit val providerWrites: Writes[MovieProvider.Value] = SerializableEnumeration.jsonWrites
val Dreamworks, Disney, Paramount = Value
}
消费者看起来像:
class MovieEventConsumer @Inject()(movieService: MovieService
) extends ConsumerRecordProcessor with LazyLogging {
override def process(record: IncomingRecord): Unit = {
val movieEventJson = Json.parse(record.valueString).validate[MovieEvent[DreamworksConfiguration]]
movieEventJson match {
case event: JsSuccess[MovieEvent[DreamworksJobOptions]] => processMovieEvent(event.get)
case er: JsError =>
logger.error("Unrecognized MovieEvent, attempting to parse as MovieUploadEvent: " + JsError.toJson(er).toString())
try {
val data = (Json.parse(record.valueString) \ "upload").as[MovieUploadEvent]
processUploadEvent(data)
} catch {
case er: Exception => logger.error("Unrecognized kafka event", er)
}
}
}
def processMovieEvent[T](event: MovieEvent[T]): Unit = {
logger.debug(s"Received movie event: ${event}")
movieService.createMovieJob(event)
}
def processUploadEvent(event: MovieUploadEvent): Unit = {
logger.debug(s"Received upload event: ${event}")
movieService.addToCollection(event)
}
}
现在,我只能验证三种不同的movieevent配置(梦工厂、迪斯尼和派拉蒙)中的一种。我可以交换我通过代码验证的那一个,但这不是重点。但是,我想验证这三种方法中的任何一种,而不必增加消费者。我试过玩弄一些不同的想法,但都没有。我对Kafka和Kafka还很陌生,不知道有没有一个好的方法可以做到这一点。
提前谢谢!
1条答案
按热度按时间xvw2m8pv1#
我将假设可能的配置数量是有限的,并且在编译时都是已知的(在您的示例中为3)。
一种可能性是
MovieEvent
具有泛型类型的密封特征T
. 下面是一个简单的例子:打印出来的
我省略了解析部分,因为我没有播放json atm的权限。但是由于你有一个封闭的层次结构,你可以一个接一个地尝试你的每一个选择(你必须这么做,因为我们不能静态地保证
DreamWorksEvent
没有与相同的json结构DisneyEvent
-您需要决定先尝试哪种类型,然后在第一种类型解析失败时回退到将json解析为另一种类型。现在你的另一个代码是非常通用的。要添加一个新的事件类型,只需添加另一个子类即可
MovieEvent
,以及确保您的解析逻辑处理新的情况。这里的神奇之处在于,您不必指定T
当提到MovieEvent
,因为你知道你有一个封闭的等级体系,因此可以恢复T
通过模式匹配。