scala将cassandratable结果连接到Dataframe

zsohkypk  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(317)

我正在使用datastax spark cassandra连接器访问cassandra中的一些数据。我的要求是将rdd与cassandra表连接起来,获取结果并将其存储在hive表中。
我使用joinwithcassandratable来连接cassadra表。加入后,恢复rdd如下所示

com.datastax.spark.connector.rdd.CassandraJoinRDD[org.apache.spark.sql.Row, 
com.datastax.spark.connector.CassandraRow] = 
CassandraJoinRDD[17] at RDD at CassandraRDD.scala:19

我尝试了以下步骤来转换为Dataframe,但没有一种方法是有效的。

val data=joinWithRDD.map{
   case(_, cassandraRow) =>    Row(cassandraRow.columnValues:_*)
}

sqlContext.createDataFrame(data,schema)

我的错误率正在下降

java.lang.ClassCastException: cannot assign instance of
   scala.collection.immutable.List$SerializationProxy to field 
   org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of 
   type scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD

你能帮我转换一下吗 joinWithCassandraTable 到Dataframe?

hyrbngr7

hyrbngr71#

如我所见,您正在连接的左侧使用Dataframe。而不是使用 joinWithCassandraTable 如果使用rddapi,我建议使用spark cassandra connector 2.5.x(2.5.1是最新的),它在dataframeapi中支持join,并直接使用它。这真的很简单,你只需要从 --conf spark.sql.extensions=com.datastax.spark.connector.CassandraSparkExtensions 要激活此功能,在此之后,代码仅在Dataframe上使用普通联接:

val parsed = ...some dataframe...
val cassandra = spark.read
  .format("org.apache.spark.sql.cassandra")
  .options(Map("table" -> "stock_info", "keyspace" -> "test"))
  .load

// we can use left join to detect what data is incorrect - if we don't have some data in the
// Cassandra, then symbol field will be null, so we can detect such entries, and do something with that
// we can omit the joinType parameter, in that case, we'll process only data that are in the Cassandra
val joined = parsed.join(cassandra, cassandra("symbol") === parsed("ticker"), "left")
   .drop("ticker")

完整的源代码和自述文件在这里。

相关问题