在使用scala spark写入kafka主题之前,为Dataframe定义架构

mzaanser  于 2021-07-09  发布在  Spark
关注(0)|答案(0)|浏览(187)

我有以下dataframe(finaldataframe)模式

root
     |-- sentence: string (nullable = true)
     |-- category: string (nullable = true)
     |-- Id: string (nullable = true)

我定义了以下模式

def defineS3SinkSchema() : StructType = {
    new StructType()
      .add("payload", new StructType()
        .add("sentence", StringType)
        .add("Id", LongType)
        .add("category", StringType)
        )
  }

我想使用上面的模式对上面定义的Dataframe进行修改,并写入一个Kafka主题。但我不知道如何将已定义的模式与Dataframe集成。下面是写Kafka的代码主题。

val jsonFormatData = finalDataFrame.select(col("key").cast("string").alias("key"),
      to_json(struct(
        col("sentence"),
        col("category"),
        col("key").as("Id")
      )).alias("value"))
    jsonFormatData.printSchema()
    val writeStream = jsonFormatData
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", hostAddress)
      .option("topic", "myTopic.val")
      .option("checkpointLocation", "test_path")
      .start()
    writeStream.awaitTermination()

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题