我正在尝试创建一个从kafka到spark的结构化流,spark是一个json字符串。现在我们要将json解析为特定的列,然后以最佳速度将Dataframe保存到cassandra表中。使用spark 2.4和cassandra 2.11(apache),而不是dse。
我试着创建一个直接流,它给出case类的dstream,我在dstream上使用foreachrdd保存到cassandra中,但是每6-7天就会挂起一次。所以我们尝试流式传输,直接提供Dataframe,并可以保存到Cassandra。
val conf = new SparkConf()
.setMaster("local[3]")
.setAppName("Fleet Live Data")
.set("spark.cassandra.connection.host", "ip")
.set("spark.cassandra.connection.keep_alive_ms", "20000")
.set("spark.cassandra.auth.username", "user")
.set("spark.cassandra.auth.password", "pass")
.set("spark.streaming.stopGracefullyOnShutdown", "true")
.set("spark.executor.memory", "2g")
.set("spark.driver.memory", "2g")
.set("spark.submit.deployMode", "cluster")
.set("spark.executor.instances", "4")
.set("spark.executor.cores", "2")
.set("spark.cores.max", "9")
.set("spark.driver.cores", "9")
.set("spark.speculation", "true")
.set("spark.locality.wait", "2s")
val spark = SparkSession
.builder
.appName("Fleet Live Data")
.config(conf)
.getOrCreate()
println("Spark Session Config Done")
val sc = SparkContext.getOrCreate(conf)
sc.setLogLevel("ERROR")
val ssc = new StreamingContext(sc, Seconds(10))
val sqlContext = new SQLContext(sc)
val topics = Map("livefleet" -> 1)
import spark.implicits._
implicit val formats = DefaultFormats
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "brokerIP:port")
.option("subscribe", "livefleet")
.load()
val collection = df.selectExpr("CAST(value AS STRING)").map(f => parse(f.toString()).extract[liveevent])
val query = collection.writeStream
.option("checkpointLocation", "/tmp/check_point/")
.format("kafka")
.format("org.apache.spark.sql.cassandra")
.option("keyspace", "trackfleet_db")
.option("table", "locationinfotemp1")
.outputMode(OutputMode.Update)
.start()
query.awaitTermination()
应该是将Dataframe保存到cassandra。但是得到这个错误:
线程“main”org.apache.spark.sql.analysisexception中出现异常:必须使用writestream.start()执行具有流源的查询
2条答案
按热度按时间mklgxw1f1#
如果您使用的是spark2.4.0,那么请尝试使用foreachbatchwriter。它在流式查询上使用基于批处理的writer。
jk9hmnmh2#
根据错误消息,我想说Cassandra不是一个流接收器,我相信你需要使用
.write
```collection.write
.format("org.apache.spark.sql.cassandra")
.options(...)
.save()
import org.apache.spark.sql.cassandra._
// ...
collection.cassandraFormat(table, keyspace).save()
如果这不管用,你就需要一个撰稿人
另外值得一提的是,datastax发布了一个kafka连接器,kafka connect包含在kafka安装(假设为0.10.2)或更高版本中。你可以在这里找到它的公告