spark full rdd joinwithcassandratable java.lang.illegalargumentexception:要求失败:无效行大小:代替

k5hmc34c  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(460)

我正在尝试将sparkDataframe连接到cassandra表。
很遗憾,我们无法立即升级到新的datastax connector 2.5.0并使用直接连接
因此,我正在使用现有的joinwithcassandratable尝试rdd方法
这是我的示例代码


# Cassandra Table Definition

custId: text PRIMARY KEY
custName: text
custAddress: text

val testDF = Seq(("event-01", "cust-01"), ("event-02", "cust-02")).toDF(("eventId", "custId"))

val resultRdd = testDF
    .rdd
    .leftJoinWithCassandraTable(
      keyspaceName = "my_key_space",
      tableName = "cust_table",
      selectedColumns = AllColumns,
      joinColumns = SomeColumns("custId")
    )
    .map { case (sparkRow, cassandraRow) =>
      val resultStruct = cassandraRow
        .map(r => Row.fromSeq(r.columnValues))
        .orNull
      Row.fromSeq(sparkRow.toSeq :+ resultStruct)
    }

这将引发java.lang.illegalargumentexception:需求失败:无效的行大小:2而不是1
如果我限制 testDF 只要有custid列,就可以了。
我在哪里犯了错误。如何在完整的rdd上执行连接,而不是只使用键列进行投影

kq0g1dla

kq0g1dla1#

你需要使用 .on(SomeColumns("custId")) 就在左边和Cassandratable连接之后。。。
我有一篇关于cadsandra高效连接的博文,它也描述了RDDAPI。。。

相关问题