屏蔽来自kafka流的数据

6yt4nkrj  于 2021-05-19  发布在  Spark
关注(0)|答案(1)|浏览(530)

我使用spark结构化流来从kafka流式传输数据,这给了我以下模式的Dataframe

Column     Type
key        binary
value      binary
topic      string
partition  int
offset     long
timestamp  long
timestampType   int

这里的值列是二进制格式的,但它实际上是一个结构类型的json字符串,需要读取json结构,屏蔽其中的几个字段并写入数据。

8dtrkrch

8dtrkrch1#

您可以按照structured streaming+kafka integration指南中给出的指导来理解如何将二进制值转换为字符串值。

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()

df.selectExpr("CAST(value AS STRING)")
  .as[String]

然后可以根据实际的json结构定义模式,例如:

val schema: StructType = new StructType()
    .add("field1", StringType)
    .add("field2", ArrayType(new StructType()
      .add("f2", StringType)
      .add("f2", DoubleType)
    ))

使用 from_json 函数将允许您处理json字符串中的数据,请参阅文档,例如:

df.selectExpr("CAST(value AS STRING)")
  .select(from_json('json, schema).as("data"))

这样,您就可以通过使用结构化api(如 withColumn 以及 drop .
如果不想定义整个模式,可以考虑使用 get_json_object .

相关问题