嗨,我是spark streaming的新手。我正在尝试实现一个流解决方案,该解决方案将从kafka读取json消息并将其存储在cassandra中。我面临的问题是\u json没有将json转换为case类。
这是我的json:
{"brand":"hortense","category":"clothing","description":"Hortense B. Hewitt 25027 Peplum Garter Set","mode":"dinner's club","orditems":"2","productcode":"8f6e9f55-c69d-4b2c-a249-572b4e53fa9a","unitprice":"3360"}
构建.sbt
scalaVersion := "2.11.8"
val spark="2.3.1"
val kafka="0.10.1"
val cassandra="3.2"
val cassandraConnectot="2.3.0"
// https://mvnrepository.com/artifact/org.apache.kafka/kafka
//Tips Taken from:https://www.scala-sbt.org/1.x/docs/Resolvers.html
resolvers += "DefaultMavenRepository" at "https://mvnrepository.com/artifact/"
dependencyOverrides += "com.google.guava" % "guava" % "15.0"
dependencyOverrides += "com.fasterxml.jackson.core" % "jackson-core" % "2.9.6"
dependencyOverrides += "com.fasterxml.jackson.core" % "jackson-databind" % "2.9.6"
dependencyOverrides += "com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.9.6"
// https://mvnrepository.com/artifact/com.fasterxml.jackson.dataformat/jackson-dataformat-cbor
dependencyOverrides += "com.fasterxml.jackson.dataformat" % "jackson-dataformat-cbor" % "2.9.6"
//libraryDependencies += "org.scala-sbt" % "sbt" % "1.2.8" % "provided"
libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % spark
libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka-0-10" %spark
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.3.1"
libraryDependencies +="com.typesafe.play" %"play-json_2.11" % "2.5.0" exclude("com.fasterxml.jackson.core","jackson-databind")
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.3.1"
libraryDependencies +="com.typesafe" % "config" %"1.3.2"
libraryDependencies +="com.datastax.spark" %% "spark-cassandra-connector" % cassandraConnectot
libraryDependencies +="com.datastax.spark" %% "spark-cassandra-connector-embedded" % cassandraConnectot % Test
libraryDependencies += "org.apache.spark" %% "spark-sql-kafka-0-10" % "2.3.1" % "provided"
libraryDependencies += "com.datastax.spark" % "spark-cassandra-connector_2.11" % "2.3.0"
===================================主类======================================================
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{DoubleType, StringType, StructField, StructType}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions.from_json
import org.apache.spark.sql.streaming.Trigger
//spark-submit --master local --driver-memory 1g --executor-memory 1g --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0 --class
// TransctionReceiverStructuredTreaming /Users/abhinav/Downloads/SparkStreamingExample/target/scala-2.11/sparkstreamingexample_2.11-0.1.0-SNAPSHOT.jar
object TransctionReceiverStructuredTreaming extends SparkSessionBuilder {
def main(args: Array[String]) {
case class TransctionData(productcode: String, description: String, brand: String, category: String, unitprice: String, orditems: String, mode: String)
val transactionSchema = StructType(Array(
StructField("brand", StringType, true),
StructField("category", StringType, true),
StructField("description", StringType, true),
StructField("mode", StringType, true),
StructField("orditems", DoubleType, true),
StructField("productcode", StringType, true),
StructField("unitprice", StringType, true)))
val spark = buildSparkSession
import spark.implicits._
/*val spark = SparkSession.builder
.master("local")
.appName("TransctionReceiver")
.getOrCreate();*/
val ssc = new StreamingContext(spark.sparkContext, Seconds(30))
import spark.implicits._
val topics = List("-first_topic")
val rawKafkaDF = spark.sqlContext.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "-first_topic")
.option("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
.option("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
.option("startingOffsets", "earliest")
// .option("endingOffsets", "latest")
//.option("auto.offset.reset", "earliest")
// .option("group.id", "group66554")
.load()
// println("rawKafkaDF writing in kafka>>>>"+rawKafkaDF.count())
import spark.implicits._
val df = rawKafkaDF
.selectExpr("CAST(value AS STRING)").as[String]
.flatMap(_.split("\n"))
df.writeStream
.format("console")
.trigger(Trigger.Once())
.start().awaitTermination()
//val jsons = df.select(from_json($"value", transactionSchema) as "data").select("data.*")
val jsons1 = df.select($"value" cast "string" as "json")
.select(from_json($"json", transactionSchema) as "data")
.select("data.*")
jsons1.writeStream
.format("console")
.trigger(Trigger.Once())
.start().awaitTermination()
println("Print 2 end >>>>")
val sink = jsons1
.writeStream
.queryName("KafkaToCassandraForeach")
.outputMode("update")
.foreach(new CassandraSinkForeach())
.start()
sink.awaitTermination()
ssc.start()
}
}
=============================================
When I run this program:
I can see:
df.writeStream
.format("console")
.trigger(Trigger.Once())
.start().awaitTermination()
=====Giving O/P=====
+--------------------+
| value|
+--------------------+
|{"brand":"adult",...|
|{"brand":"polo","...|
|{"brand":"timberl...|
+--------------------+
But from_json is not printing any data:Also in cassandra only Null has been entered.
jsons1.writeStream
.format("console")
.trigger(Trigger.Once())
.start().awaitTermination()
+-----+--------+-----------+----+--------+-----------+---------+
|brand|category|description|mode|orditems|productcode|unitprice|
+-----+--------+-----------+----+--------+-----------+---------+
| null| null| null|null| null| null| null|
| null| null| null|null| null| null| null|
| null| null| null|null| null| null| null|
| null| null| null|null| null| null| null|
工作解决方案:
import org.apache.spark.sql.{Dataset, Encoders, SparkSession}
import org.apache.spark.sql.types.{DoubleType, StringType, StructField, StructType}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions.{col, from_json}
import org.apache.spark.sql.streaming.Trigger
import sampleTestClass.Bean44
//spark-submit --master local --driver-memory 1g --executor-memory 1g --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0 --class
// TransctionReceiverStructuredTreaming /Users/abhinav/Downloads/SparkStreamingExample/target/scala-2.11/sparkstreamingexample_2.11-0.1.0-SNAPSHOT.jar
case class Bean44(brand:String,category:String,description:String,mode:String,orditems:String,productcode:String,unitprice:String)
object TransctionReceiverStructuredTreaming extends SparkSessionBuilder {
def main(args: Array[String]) {
case class TransctionData(productcode: String, description: String, brand: String, category: String, unitprice: String, orditems: String, mode: String)
val transactionSchema = StructType(Array(
StructField("brand", StringType, true),
StructField("category", StringType, true),
StructField("description", StringType, true),
StructField("mode", StringType, true),
StructField("orditems", DoubleType, true),
StructField("productcode", StringType, true),
StructField("unitprice", StringType, true)))
val spark = buildSparkSession
import spark.implicits._
val ssc = new StreamingContext(spark.sparkContext, Seconds(30))
val topics = List("-first_topic")
val rawKafkaDF = spark.sqlContext.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "-first_topic")
.option("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
.option("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
.option("startingOffsets", "earliest")
.load()
val schema = Encoders.product[Bean44].schema
val df1 = rawKafkaDF
.selectExpr("CAST(value AS STRING)").as[String]
.flatMap(_.split("\n")).toDF()
val df = df1.withColumn("data",from_json(col("value"),schema)).select("data.*").as[Bean44]
df.writeStream
.format("console")
.trigger(Trigger.Once())
.start().awaitTermination()
val sink = df
.writeStream
.queryName("KafkaToCassandraForeach")
.outputMode("update")
.foreach(new CassandraSinkForeach())
.start().awaitTermination()
ssc.start()
}
1条答案
按热度按时间lmyy7pcs1#
我想你已经很接近了
我遵循的步骤
将json作为dataframe中的字符串列表加载
从
Bean44
使用from_json
至Bean44
结构类型df.select("data.*")
和你一样,加上.as[Bean44]
得到Dataset[Bean44]
```import org.apache.spark.sql.{Dataset, Encoders}
import org.apache.spark.sql.functions._
object JsonToCase {
def main(args: Array[String]): Unit = {
}
}
case class Bean44(brand:String,category:String,description:String,mode:String,orditems:String,productcode:String,unitprice:String)