我正在学习spark streaming并尝试保存我的示例股票数据(就像字符串一样)msft:28.29)从Kafka主题到Cassandra使用Spark流和CassandraSpark连接器。
不需要保存到cassandra,我的代码就可以正常工作(从kafka获取数据并进行一些琐碎的统计计算)。配置了cassandra并建立了连接。
但如果我试图添加以下行,以便在处理之前将原始数据保存到cassandra表中:
stockParsed.saveToCassandra("dashboard","raw_tick")
在SparkStreamingUI中,我看到一个批挂在“处理”状态,其余的都处于“排队”状态,在cassandra中没有任何数据。
在spark console中,我只看到如下线条:
16/02/16 10:18:40 INFO JobScheduler: Added jobs for time 1455635920000 ms
16/02/16 10:18:50 INFO JobScheduler: Added jobs for time 1455635930000 ms
16/02/16 10:19:00 INFO JobScheduler: Added jobs for time 1455635940000 ms
这是我的密码:
case class Stock(ticker: String, price: Double)
// ....
val conf = new SparkConf().setAppName("KafkaStream").setMaster("local[*]")
.set("spark.cassandra.connection.host", "localhost")
.set("spark.cassandra.auth.username", "cassandra")
.set("spark.cassandra.auth.password", "cassandra")
.set("spark.cassandra.connection.keep_alive_ms","60000")
.set("spark.cassandra.input.split.size_in_mb","1")
val ssc = new StreamingContext(conf, Seconds(10))
val topicMap = Map("test" -> 1)
val lines = KafkaUtils.createStream(ssc, "localhost:2181", "test-group", topicMap).map(_._2)
val stockParsed = lines.map(line => line.split(':')).map(s => Stock(s(0).toString, s(1).toDouble))
//Problem here
stockParsed.saveToCassandra("dashboard","raw_tick",SomeColumns("ticker", "price"))
//Some processing below
我的版本.sbt:
import sbt.Keys._
name := "KafkaStreamSbt"
version := "1.0"
scalaVersion := "2.10.6"
libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.0" % "provided"
libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka-assembly" % "1.6.0"
libraryDependencies += "com.datastax.spark" %% "spark-cassandra-connector-java" % "1.5.0-RC1"
libraryDependencies += "org.slf4j" % "slf4j-api" % "1.7.16"
你知道怎么修吗?
1条答案
按热度按时间jtoj6r0c1#
问题解决了:我在cassandra键空间配置中出错。使用此脚本重新创建键空间后:
代码工作正常。