我试着用cassandra作为我们一些spark工作中的关键价值查找库。
我们主要使用Dataframe,并且已经从rddapi中移除。
而不是与表连接,将它们加载到spark或
推动加入Cassandra并采取措施避免大规模
表扫描,我想我可以写一个spark自定义项,连接到cassandra,a查找一个键
另外,我还想将结果行转换为case类对象并返回该对象。
我从下面这个问题的回答中得到了一些信息。withsessiondo重用每个节点上可用的底层jvm级会话spark cassandra connector
val connector = CassandraConnector(sparkConf) // I Know this is serializable.
def lookupKey(connector: CassandraConnector, keyspace: String, table: String): UserDefineFunction = udf((key: String) => {
connector.withSessionDo(session => {
val stmt = session.prepare(s"SELECT * FROM $keyspace.$table WHERE key = ?")
val result = session.execute( stmt.bind(key) )
MyCaseClass(
fieldl1 = result.getString(0),
fieldl2 = result.getInt(1)
...
)
}
})
会话不可序列化,因此我们无法在udf外部创建一个会话并将其传入,以便使用Map管理器将行转换为case类示例。另一种方法是使用mapping manager,
def lookupKeyAlt(connector: CassandraConnector, keyspace: String, table: String): UserDefineFunction = udf((key: String) => {
connector.withSessionDo(session => {
val manager = new MappingManager(session) // session isn't serializable, so creating one outside and passing to udf is not an option if wf we were willing to do the session management.
val mapperClass = manager.mapper(classOf[MyCaseClass], keyspace)
mapperClass.get(key)
}
})
我是新来Cassandra的,所以请耐心回答我的几个问题。
这些方法中有什么我不知道的缺陷吗?
在第二种方法中,我知道我们正在创建一个新的mappingmanager(session),每次调用udf。这还会使用jvm级别的会话并打开更多的会话吗?每次调用都示例化mappingmanager对吗?会话不可序列化,因此我无法在外部创建它并将其传递给udf。
将结果行转换为case类的对象的其他方法有哪些?
有没有更好的方法来做这种查找?
1条答案
按热度按时间1zmg4dgp1#
您试图模拟spark cassandra connector(scc)在引擎盖下所做的工作,但是您的实现将比scc慢得多,因为您使用的是同步api,并且一个接一个地获取所有数据,而scc使用的是异步api,并且并行地拉取多行的数据。
实现所需的最佳方法是使用cassandra优化连接(通常称为“直接连接”)。这种连接始终可用于rddapi,但在很长一段时间内,只有商业版的连接器才可用于dataframeapi。但是自从SCC2.5.0(于2020年5月发布)以来,这个功能也可以在开源版本中使用,所以您可以使用它来代替构建它的仿真。只有通过传递
spark.sql.extensions=com.datastax.spark.connector.CassandraSparkExtensions
配置sparksession时(例如通过命令行)。之后,您可以通过完全或部分主键与cassandra表执行join,scc将自动将join转换为对cassandra的单个请求,这些请求将非常有效地执行。您可以通过执行explain
在连接的Dataframe上,您应该看到如下内容(查找字符串cassandra direct join):我最近写了一篇很长的博客文章,解释了如何使用dataframe和rddapi在cassandra中执行有效的数据连接-我不想在这里重复:-)