使用cassandra连接器启动udf查找键

osh3o9ms  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(466)

我试着用cassandra作为我们一些spark工作中的关键价值查找库。
我们主要使用Dataframe,并且已经从rddapi中移除。
而不是与表连接,将它们加载到spark或
推动加入Cassandra并采取措施避免大规模
表扫描,我想我可以写一个spark自定义项,连接到cassandra,a查找一个键
另外,我还想将结果行转换为case类对象并返回该对象。
我从下面这个问题的回答中得到了一些信息。withsessiondo重用每个节点上可用的底层jvm级会话spark cassandra connector

val connector = CassandraConnector(sparkConf) // I Know this is serializable.

def lookupKey(connector: CassandraConnector, keyspace: String, table: String): UserDefineFunction = udf((key: String) => {
    connector.withSessionDo(session => {
        val stmt = session.prepare(s"SELECT * FROM $keyspace.$table WHERE key = ?")
        val result = session.execute( stmt.bind(key) )
        MyCaseClass(
           fieldl1 = result.getString(0),
           fieldl2 = result.getInt(1)
           ...
        )
    }
})

会话不可序列化,因此我们无法在udf外部创建一个会话并将其传入,以便使用Map管理器将行转换为case类示例。另一种方法是使用mapping manager,

def lookupKeyAlt(connector: CassandraConnector, keyspace: String, table: String): UserDefineFunction = udf((key: String) => {
    connector.withSessionDo(session => {
        val manager = new MappingManager(session)   // session isn't serializable, so creating one outside and passing to udf is not an option if wf we were willing to do the session management.
        val mapperClass = manager.mapper(classOf[MyCaseClass], keyspace)
        mapperClass.get(key)
    }
})

我是新来Cassandra的,所以请耐心回答我的几个问题。
这些方法中有什么我不知道的缺陷吗?
在第二种方法中,我知道我们正在创建一个新的mappingmanager(session),每次调用udf。这还会使用jvm级别的会话并打开更多的会话吗?每次调用都示例化mappingmanager对吗?会话不可序列化,因此我无法在外部创建它并将其传递给udf。
将结果行转换为case类的对象的其他方法有哪些?
有没有更好的方法来做这种查找?

1zmg4dgp

1zmg4dgp1#

您试图模拟spark cassandra connector(scc)在引擎盖下所做的工作,但是您的实现将比scc慢得多,因为您使用的是同步api,并且一个接一个地获取所有数据,而scc使用的是异步api,并且并行地拉取多行的数据。
实现所需的最佳方法是使用cassandra优化连接(通常称为“直接连接”)。这种连接始终可用于rddapi,但在很长一段时间内,只有商业版的连接器才可用于dataframeapi。但是自从SCC2.5.0(于2020年5月发布)以来,这个功能也可以在开源版本中使用,所以您可以使用它来代替构建它的仿真。只有通过传递 spark.sql.extensions=com.datastax.spark.connector.CassandraSparkExtensions 配置sparksession时(例如通过命令行)。之后,您可以通过完全或部分主键与cassandra表执行join,scc将自动将join转换为对cassandra的单个请求,这些请求将非常有效地执行。您可以通过执行 explain 在连接的Dataframe上,您应该看到如下内容(查找字符串cassandra direct join):

scala> joined.explain
== Physical Plan ==
Cassandra Direct Join [pk = id#30, c1 = cc1#32] test.jtest1 - Reading (pk, c1, c2, v) Pushed {}
+- *(1) Project [cast(id#28L as int) AS id#30, cast(id#28L as int) AS cc1#32]
   +- *(1) Range (1, 5, step=1, splits=8)

我最近写了一篇很长的博客文章,解释了如何使用dataframe和rddapi在cassandra中执行有效的数据连接-我不想在这里重复:-)

相关问题