尝试理解结构化流媒体

vlf7wbxs  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(331)

我对apache spark还很陌生,我试图理解scala中apache kafka的结构化流媒体,但是到目前为止,没有什么对我有利的东西,基本上我想从kafka发送json,使用spark结构化流媒体处理它,然后发送回kafka。我尝试了网站上给出的例子,但不起作用。
这是我的密码:

import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types._
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
object dataset_kafka {
  def main(args: Array[String]): Unit = {

    val spark = SparkSession
      .builder()
      .appName("kafka-consumer")
      .master("local[*]")
      .getOrCreate()

    import spark.implicits._

    spark.sparkContext.setLogLevel("WARN")

    val df = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "172.21.0.187:9093")
      .option("subscribe", "test")
      .load()

       df
      .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
      .writeStream
      .format("kafka")
      .trigger(Trigger.ProcessingTime("5 seconds"))
      .option("kafka.bootstrap.servers", "172.21.0.187:9093")
      .option("topic", "test1")
      .option("checkpointLocation", "/home/hduser/Desktop/tempo")
      .start()
      .awaitTermination()
  }
}

我哪里出了问题有什么帮助吗?
我以以下格式从Kafka发送json:

{"schema":"Hiren","payload":"123"}
nfzehxib

nfzehxib1#

我最近出版了一本技术资产选集,重点介绍apachespark的结构化流媒体。其中一个博客关注结构化流媒体:也就是说,使用结构化流媒体api,将数据从kafka流到spark。
我劝你看一下。请注意,只有spark 2.2支持回写Kafka。
https://databricks.com/blog/2017/08/24/anthology-of-technical-assets-on-apache-sparks-structured-streaming.html

相关问题