如何将Dataframe写入kafka?

eeq64g8w  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(792)

我的Dataframe df 看起来像

[Row(age=2, name=u'Alice'), Row(age=5, name=u'Bob')]

我为流式查询创建了一个kafka接收器,但没有收到kafka的任何消息。为什么?

ds = df \
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
  .writeStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("topic", "topic1") \
  .start()
rt4zxlrg

rt4zxlrg1#

你将不会从Kafka那里收到任何东西,因为根据你的代码,你正在尝试选择列 key 以及 value 从只有列的Dataframe age 以及 name . 您需要选择如下所示。
而且,你不需要 writeStream 如果Dataframe是静态的。那样的话,你需要申请 write 以及 save .

import org.apache.spark.sql.functions.{col, struct, to_json}
import org.apache.spark.sql.SparkSession

object Main extends App {

  val spark = SparkSession.builder()
    .appName("myAppName")
    .master("local[*]")
    .getOrCreate()

  // create DataFrame
  import spark.implicits._
  val df = Seq((3, "Alice"), (5, "Bob")).toDF("age", "name")
  df.show(false)

  // +---+-----+
  // |age|name |
  // +---+-----+
  // |3  |Alice|
  // |5  |Bob  |
  // +---+-----+

  // write to Kafka as is with "age" as key and "name" as value
  df.selectExpr("CAST(age AS STRING) as key", "CAST(name AS STRING) as value")
    .write
    .format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("topic", "test-topic")
    .save()

如果要将数据存储到json字符串中,可以应用以下命令“

// convert columns into json string
  val df2 = df.select(col("name"),to_json(struct($"*"))).toDF("key", "value")
  df2.show(false)

  // +-----+------------------------+
  // |key  |value                   |
  // +-----+------------------------+
  // |Alice|{"age":3,"name":"Alice"}|
  // |Bob  |{"age":5,"name":"Bob"}  |
  // +-----+------------------------+

相关问题