我试图从Kafka读取数据,并通过spark rdd存储到cassandra表中。
编译代码时出错:
/root/cassandra-count/src/main/scala/KafkaSparkCassandra.scala:69: value split is not a member of (String, String)
[error] val lines = messages.flatMap(line => line.split(',')).map(s => (s(0).toString, s(1).toDouble,s(2).toDouble,s(3).toDouble))
[error] ^
[error] one error found
[error] (compile:compileIncremental) Compilation failed
下面的代码:当我通过交互式手动运行代码时 spark-shell
它工作得很好,但是在编译 spark-submit
错误来了。
// Create direct kafka stream with brokers and topics
val topicsSet = Set[String] (kafka_topic)
val kafkaParams = Map[String, String]("metadata.broker.list" -> kafka_broker)
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder]( ssc, kafkaParams, topicsSet)
// Create the processing logic
// Get the lines, split
val lines = messages.map(line => line.split(',')).map(s => (s(0).toString, s(1).toDouble,s(2).toDouble,s(3).toDouble))
lines.saveToCassandra("stream_poc", "US_city", SomeColumns("city_name", "jan_temp", "lat", "long"))
2条答案
按热度按时间f87krz0w1#
Kafka的所有信息都是加密的。原来的Kafka流,在这种情况下
messages
,是一个元组流(key,value)
.正如编译错误所指出的,没有
split
方法。我们要做的是:
kb5ga3dv2#
KafkaUtils.createDirectStream
返回键和值的元组(因为kafka中的消息是可选的键)。你的情况是(String, String)
. 如果要拆分值,必须首先将其取出:或使用部分函数语法: