当前状态:
今天,我构建了一个spark结构化流应用程序,它使用一个kafka主题,其中包含json消息。嵌入在kafka主题的值中包含一些关于消息字段的源和模式的信息。消息的简化版本如下所示:
{
"source": "Application A",
"schema": [{"col_name": "countryId", "col_type": "Integer"}, {"col_name": "name", "col_type": "String"}],
"message": {"countryId": "21", "name": "Poland"}
}
今天的系统中有一些kafka主题,我使用subscribe选项为每个主题部署了这个spark结构化流应用程序。应用程序应用主题的唯一模式(通过批量读取kafka主题中的第一条消息并Map该模式进行攻击),并以parquet格式将其写入hdfs。
所需状态:
我的组织将很快开始产生越来越多的主题,我不认为每个主题的spark应用程序模式能够很好地扩展。起初,subscribepattern选项似乎对我很有用,因为这些主题有一种层次结构的形式,但现在我只能将模式应用于hdfs中的不同位置。
在未来,我们很可能会有数千个主题,希望只有25个左右的Spark应用程序。
有人对如何做到这一点有什么建议吗?
2条答案
按热度按时间anhgbhbe1#
当与Kafka制作人一起发送这些事件时,您还可以发送一个键和值。如果每个事件都将其事件类型作为键,则在从主题读取流时,还可以获取键:
然后,您可以筛选要处理的事件:
通过这种方式,如果您在一个spark应用程序中订阅了多个主题,您可以处理任意多个事件类型。
r7s23pms2#
如果您运行的是kafka 0.11+,请考虑使用headers功能。标头将被视为一个Map类型,然后您可以基于它们的标头路由消息,而不必首先解析正文。