value split不是(string,string)的成员

utugiqy6  于 2021-06-08  发布在  Kafka
关注(0)|答案(2)|浏览(436)

我试图从Kafka读取数据,并通过spark rdd存储到cassandra表中。
编译代码时出错:

/root/cassandra-count/src/main/scala/KafkaSparkCassandra.scala:69: value split is not a member of (String, String)

[error]     val lines = messages.flatMap(line => line.split(',')).map(s => (s(0).toString, s(1).toDouble,s(2).toDouble,s(3).toDouble))
[error]                                               ^
[error] one error found

[error] (compile:compileIncremental) Compilation failed

下面的代码:当我通过交互式手动运行代码时 spark-shell 它工作得很好,但是在编译 spark-submit 错误来了。

// Create direct kafka stream with brokers and topics
val topicsSet = Set[String] (kafka_topic)
val kafkaParams = Map[String, String]("metadata.broker.list" -> kafka_broker)
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder]( ssc, kafkaParams, topicsSet)

// Create the processing logic
// Get the lines, split
val lines = messages.map(line => line.split(',')).map(s => (s(0).toString, s(1).toDouble,s(2).toDouble,s(3).toDouble))
lines.saveToCassandra("stream_poc", "US_city", SomeColumns("city_name", "jan_temp", "lat", "long"))
f87krz0w

f87krz0w1#

Kafka的所有信息都是加密的。原来的Kafka流,在这种情况下 messages ,是一个元组流 (key,value) .
正如编译错误所指出的,没有 split 方法。
我们要做的是:

messages.map{ case (key, value)  => value.split(','))} ...
kb5ga3dv

kb5ga3dv2#

KafkaUtils.createDirectStream 返回键和值的元组(因为kafka中的消息是可选的键)。你的情况是 (String, String) . 如果要拆分值,必须首先将其取出:

val lines = 
  messages
   .map(line => line._2.split(','))
   .map(s => (s(0).toString, s(1).toDouble,s(2).toDouble,s(3).toDouble))

或使用部分函数语法:

val lines = 
  messages
   .map { case (_, value) => value.split(',') }
   .map(s => (s(0).toString, s(1).toDouble,s(2).toDouble,s(3).toDouble))

相关问题