spark结构化流式处理具有唯一消息模式的多个kafka主题

zc0qhyus  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(360)

当前状态:
今天,我构建了一个spark结构化流应用程序,它使用一个kafka主题,其中包含json消息。嵌入在kafka主题的值中包含一些关于消息字段的源和模式的信息。消息的简化版本如下所示:

{
  "source": "Application A",
  "schema": [{"col_name": "countryId", "col_type": "Integer"}, {"col_name": "name", "col_type": "String"}],
  "message": {"countryId": "21", "name": "Poland"}
}

今天的系统中有一些kafka主题,我使用subscribe选项为每个主题部署了这个spark结构化流应用程序。应用程序应用主题的唯一模式(通过批量读取kafka主题中的第一条消息并Map该模式进行攻击),并以parquet格式将其写入hdfs。
所需状态:
我的组织将很快开始产生越来越多的主题,我不认为每个主题的spark应用程序模式能够很好地扩展。起初,subscribepattern选项似乎对我很有用,因为这些主题有一种层次结构的形式,但现在我只能将模式应用于hdfs中的不同位置。
在未来,我们很可能会有数千个主题,希望只有25个左右的Spark应用程序。
有人对如何做到这一点有什么建议吗?

anhgbhbe

anhgbhbe1#

当与Kafka制作人一起发送这些事件时,您还可以发送一个键和值。如果每个事件都将其事件类型作为键,则在从主题读取流时,还可以获取键:

val kafkaKvPair = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

然后,您可以筛选要处理的事件:

val events = kafkaKvPair
  .filter(f => f._1 == "MY_EVENT_TYPE")

通过这种方式,如果您在一个spark应用程序中订阅了多个主题,您可以处理任意多个事件类型。

r7s23pms

r7s23pms2#

如果您运行的是kafka 0.11+,请考虑使用headers功能。标头将被视为一个Map类型,然后您可以基于它们的标头路由消息,而不必首先解析正文。

相关问题