我正在尝试将sparkDataframe连接到cassandra表。
很遗憾,我们无法立即升级到新的datastax connector 2.5.0并使用直接连接
因此,我正在使用现有的joinwithcassandratable尝试rdd方法
这是我的示例代码
# Cassandra Table Definition
custId: text PRIMARY KEY
custName: text
custAddress: text
val testDF = Seq(("event-01", "cust-01"), ("event-02", "cust-02")).toDF(("eventId", "custId"))
val resultRdd = testDF
.rdd
.leftJoinWithCassandraTable(
keyspaceName = "my_key_space",
tableName = "cust_table",
selectedColumns = AllColumns,
joinColumns = SomeColumns("custId")
)
.map { case (sparkRow, cassandraRow) =>
val resultStruct = cassandraRow
.map(r => Row.fromSeq(r.columnValues))
.orNull
Row.fromSeq(sparkRow.toSeq :+ resultStruct)
}
这将引发java.lang.illegalargumentexception:需求失败:无效的行大小:2而不是1
如果我限制 testDF
只要有custid列,就可以了。
我在哪里犯了错误。如何在完整的rdd上执行连接,而不是只使用键列进行投影
1条答案
按热度按时间kq0g1dla1#
你需要使用
.on(SomeColumns("custId"))
就在左边和Cassandratable连接之后。。。我有一篇关于cadsandra高效连接的博文,它也描述了RDDAPI。。。