嗨,我在一个Spark流项目工作。在这个项目中,我需要解析从kafka流接收到的数据(proto-buf消息)
我不知道如何解析Kafka的原版buf mesage。
我试图理解下面的代码来开始解析protobuf消息。
def main(args:array[string]){
val spark = SparkSession.builder.
master("local")
.appName("spark session example")
.getOrCreate()
import spark.implicits._
val ds1 = spark.readStream.format("kafka").
option("kafka.bootstrap.servers","localhost:9092").
option("subscribe","student").load()
val ds2 = ds1.map(row=> row.getAs[Array[Byte]]("value")).map(Student.parseFrom(_))
val query = ds2.writeStream
.outputMode("append")
.format("console")
.start()
query.awaitTermination()
}
有人能给我举一些例子来说明如何一步一步地解析proto-buf消息吗?我只需要一些参考资料,如何使用它在Spark流应用程序。
1条答案
按热度按时间1bqhqjot1#
我以这种方式使用结构化流媒体: