我正在使用datastax spark cassandra连接器访问cassandra中的一些数据。我的要求是将rdd与cassandra表连接起来,获取结果并将其存储在hive表中。
我使用joinwithcassandratable来连接cassadra表。加入后,恢复rdd如下所示
com.datastax.spark.connector.rdd.CassandraJoinRDD[org.apache.spark.sql.Row,
com.datastax.spark.connector.CassandraRow] =
CassandraJoinRDD[17] at RDD at CassandraRDD.scala:19
我尝试了以下步骤来转换为Dataframe,但没有一种方法是有效的。
val data=joinWithRDD.map{
case(_, cassandraRow) => Row(cassandraRow.columnValues:_*)
}
sqlContext.createDataFrame(data,schema)
我的错误率正在下降
java.lang.ClassCastException: cannot assign instance of
scala.collection.immutable.List$SerializationProxy to field
org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of
type scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD
你能帮我转换一下吗 joinWithCassandraTable
到Dataframe?
1条答案
按热度按时间hyrbngr71#
如我所见,您正在连接的左侧使用Dataframe。而不是使用
joinWithCassandraTable
如果使用rddapi,我建议使用spark cassandra connector 2.5.x(2.5.1是最新的),它在dataframeapi中支持join,并直接使用它。这真的很简单,你只需要从--conf spark.sql.extensions=com.datastax.spark.connector.CassandraSparkExtensions
要激活此功能,在此之后,代码仅在Dataframe上使用普通联接:完整的源代码和自述文件在这里。