spark Streaming:从具有多个模式的kafka中读取数据

wixjitnu  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(375)

我正在努力实现spark流。
来自Kafka的信息看起来像这样,但有更多的领域

{"event":"sensordata", "source":"sensors", "payload": {"actual data as a json}}
{"event":"databasedata", "mysql":"sensors", "payload": {"actual data as a json}}
{"event":"eventApi", "source":"event1", "payload": {"actual data as a json}}
{"event":"eventapi", "source":"event2", "payload": {"actual data as a json}}

我试图从一个Kafka主题(有多个模式)中读取消息。我需要阅读每条消息,查找事件和源字段,并决定在何处存储数据集。实际数据在字段有效负载中是一个json,它只是一条记录。
有人能帮我实现这个或任何其他替代方案吗?
在同一主题中使用多个模式发送消息并使用它是一种好方法吗?
提前谢谢,

fcipmucu

fcipmucu1#

您可以创建 Dataframe 来自传入的json对象。
创建 Seq[Sring] json对象的。
使用val df=spark.read.json[Seq[String]] .
在上执行操作 dataframe df 你自己选择。

2ledvvac

2ledvvac2#

将jsonstring转换为 JavaBean 如果你只关心一些栏目

相关问题