spark结构化流:scala中的模式推理

vecaoik1  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(581)

我试图从kafka主题推断动态json模式。在blog中找到这段代码,它使用pyspark推断模式。

def read_kafka_topic(topic):

    df_json = (spark.read
               .format("kafka")
               .option("kafka.bootstrap.servers", kafka_broker)
               .option("subscribe", topic)
               .option("startingOffsets", "earliest")
               .option("endingOffsets", "latest")
               .option("failOnDataLoss", "false")
               .load()
               .withColumn("value", expr("string(value)"))
               .filter(col("value").isNotNull())
               .select("key", expr("struct(offset, value) r"))
               .groupBy("key").agg(expr("max(r) r")) 
               .select("r.value"))

    df_read = spark.read.json(
    df_json.rdd.map(lambda x: x.value), multiLine=True)**

试用scala:


**val df_read = spark.read.json(df_json.rdd.map(x=>x))**

但我的错误正在减少。
无法应用于(org.apache.spark.rdd.rdd[org.apache.spark.sql.row])val df琰read=spark.read.json(df琰json.rdd.map(x=>x))
有什么办法吗?请帮忙。

wnavrhmk

wnavrhmk1#

结构化流媒体不支持rdd。
结构化流不允许模式推断。
需要定义架构。
e、 g.对于文件源

val dataSchema = "Recorded_At timestamp, Device string, Index long, Model string, User string, _corrupt_record String, gt string, x double, y double, z double"
val dataPath = "dbfs:/mnt/training/definitive-guide/data/activity-data-stream.json"

val initialDF = spark
  .readStream                             // Returns DataStreamReader
  .option("maxFilesPerTrigger", 1)        // Force processing of only 1 file per trigger 
  .schema(dataSchema)                     // Required for all streaming DataFrames
  .json(dataPath)                         // The stream's source directory and file type

e、 g.Kafka的情况就像数据瑞克教你的那样

spark.conf.set("spark.sql.shuffle.partitions", sc.defaultParallelism)

val kafkaServer = "server1.databricks.training:9092"  // US (Oregon)
// kafkaServer = "server2.databricks.training:9092"   // Singapore

val editsDF = spark.readStream                        // Get the DataStreamReader
  .format("kafka")                                    // Specify the source format as "kafka"
  .option("kafka.bootstrap.servers", kafkaServer)     // Configure the Kafka server name and port
  .option("subscribe", "en")                          // Subscribe to the "en" Kafka topic 
  .option("startingOffsets", "earliest")              // Rewind stream to beginning when we restart notebook
  .option("maxOffsetsPerTrigger", 1000)               // Throttle Kafka's processing of the streams
  .load()                                             // Load the DataFrame
  .select($"value".cast("STRING"))                    // Cast the "value" column to STRING

import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType, DoubleType, BooleanType, TimestampType}

lazy val schema = StructType(List(
  StructField("channel", StringType, true),
  StructField("comment", StringType, true),
  StructField("delta", IntegerType, true),
  StructField("flag", StringType, true),
  StructField("geocoding", StructType(List(            //  (OBJECT): Added by the server, field contains IP address geocoding information for anonymous edit.
    StructField("city", StringType, true),
    StructField("country", StringType, true),
    StructField("countryCode2", StringType, true),
    StructField("countryCode3", StringType, true),
    StructField("stateProvince", StringType, true),
    StructField("latitude", DoubleType, true),
    StructField("longitude", DoubleType, true)
  )), true),
  StructField("isAnonymous", BooleanType, true),
  StructField("isNewPage", BooleanType, true),
  StructField("isRobot", BooleanType, true),
  StructField("isUnpatrolled", BooleanType, true),
  StructField("namespace", StringType, true),           //   (STRING): Page's namespace. See https://en.wikipedia.org/wiki/Wikipedia:Namespace 
  StructField("page", StringType, true),                //   (STRING): Printable name of the page that was edited
  StructField("pageURL", StringType, true),             //   (STRING): URL of the page that was edited
  StructField("timestamp", TimestampType, true),        //   (STRING): Time the edit occurred, in ISO-8601 format
  StructField("url", StringType, true),
  StructField("user", StringType, true),                //   (STRING): User who made the edit or the IP address associated with the anonymous editor
  StructField("userURL", StringType, true),
  StructField("wikipediaURL", StringType, true),
  StructField("wikipedia", StringType, true)            //   (STRING): Short name of the Wikipedia that was edited (e.g., "en" for the English)
))

import org.apache.spark.sql.functions.from_json

val jsonEdits = editsDF.select(
  from_json($"value", schema).as("json")) 
...
...

相关问题