如何解析spark 1.6中kafka流接收到的spark流中的proto buf mesage

bf1o4zei  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(408)

嗨,我在一个Spark流项目工作。在这个项目中,我需要解析从kafka流接收到的数据(proto-buf消息)
我不知道如何解析Kafka的原版buf mesage。
我试图理解下面的代码来开始解析protobuf消息。
def main(args:array[string]){

val spark = SparkSession.builder.
  master("local")
  .appName("spark session example")
  .getOrCreate()

import spark.implicits._

val ds1 = spark.readStream.format("kafka").
  option("kafka.bootstrap.servers","localhost:9092").
  option("subscribe","student").load()

val ds2 = ds1.map(row=> row.getAs[Array[Byte]]("value")).map(Student.parseFrom(_))

val query = ds2.writeStream
  .outputMode("append")
  .format("console")
  .start()

query.awaitTermination()

}
有人能给我举一些例子来说明如何一步一步地解析proto-buf消息吗?我只需要一些参考资料,如何使用它在Spark流应用程序。

1bqhqjot

1bqhqjot1#

我以这种方式使用结构化流媒体:

import MessagesProto #Your proto.py file
from datetime import datetime as dt
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.functions import udf

def message_proto(value):
    m = MessagesProto.message_x()
    m.ParseFromString(value)
    return({'x': y,
            'z': w
           })
schema_impressions = StructType() \
    .add("x", StringType()) \
    .add("z", TimestampType())

proto_udf = udf(message_proto, schema_impressions)

class StructuredStreaming():

    def structured_streming(self):       

        stream = self.spark.readStream \
          .format("kafka") \
          .option("kafka.bootstrap.servers", self.kafka_bootstrap_servers) \
          .option("subscribe", self.topic) \
          .option("startingOffsets", self.startingOffsets) \
          .option("max.poll.records", self.max_poll_records) \
          .option("auto.commit.interval.ms", self.auto_commit_interval_ms) \
          .option("session.timeout.ms", self.session_timeout_ms) \
          .option("key.deserializer", self.key_deserializer) \
          .option("value.deserializer", self.value_deserializer) \
          .load()

        self.query = stream \
        .select(col("value")) \
        .select(proto_udf("value").alias("value_udf")) \
        .select("value_udf.x", "valued_udf.y)

相关问题