我正在努力实现spark流。
来自Kafka的信息看起来像这样,但有更多的领域
{"event":"sensordata", "source":"sensors", "payload": {"actual data as a json}}
{"event":"databasedata", "mysql":"sensors", "payload": {"actual data as a json}}
{"event":"eventApi", "source":"event1", "payload": {"actual data as a json}}
{"event":"eventapi", "source":"event2", "payload": {"actual data as a json}}
我试图从一个Kafka主题(有多个模式)中读取消息。我需要阅读每条消息,查找事件和源字段,并决定在何处存储数据集。实际数据在字段有效负载中是一个json,它只是一条记录。
有人能帮我实现这个或任何其他替代方案吗?
在同一主题中使用多个模式发送消息并使用它是一种好方法吗?
提前谢谢,
2条答案
按热度按时间fcipmucu1#
您可以创建
Dataframe
来自传入的json对象。创建
Seq[Sring]
json对象的。使用val
df=spark.read.json[Seq[String]]
.在上执行操作
dataframe df
你自己选择。2ledvvac2#
将jsonstring转换为
JavaBean
如果你只关心一些栏目