问题:本质上,这意味着,不是为每个流式处理记录运行一个c表连接,而是在spark流式处理中为每个记录的微批(微批处理)运行一个连接吗?
我们几乎完成使用sparksql2.4.x版本,datastax spark cassandra connector for cassandra-3.x版本。
但是在下面的场景中,关于效率有一个基本的问题。
对于流数据记录(即streamingdataset),我需要从cassandra(c)表中查找现有记录(即cassandradataset)。
即
Dataset<Row> streamingDataSet = //kafka read dataset
Dataset<Row> cassandraDataset= //loaded from C* table those records loaded earlier from above.
要查找数据,我需要加入上述数据集
即。
Dataset<Row> joinDataSet = cassandraDataset.join(cassandraDataset).where(//somelogic)
进一步处理joindataset以实现业务逻辑。。。
在上面的场景中,我的理解是,对于从kafka流接收到的每条记录,它将查询c表,即数据库调用。
如果c表包含数十亿条记录,是否需要大量的时间和网络带宽?为了改进查找c表,应该遵循什么方法/程序?
在这种情况下,最好的解决方案是什么?我无法从c表加载一次并查找,因为数据一直添加到c*表中。。。i、 e.新的查找可能需要新的持久化数据。
如何处理这种情况?有什么建议吗。。
1条答案
按热度按时间ghhaqwfi1#
如果您使用的是apache cassandra,那么只有一种可能性可以有效地连接cassandra中的数据——通过rddapi
joinWithCassandraTable
. spark cassandra connector(scc)的开源版本只支持它,而在dse版本中,有一个代码允许对cassandra执行有效的连接,也支持spark sql,即所谓的dse直接连接。如果你用join
在针对cassandra表的sparksql中,spark需要从cassandra读取所有数据,然后执行join—这非常慢。我没有oss scc为spark结构化流媒体做连接的例子,但是我有一些“普通”连接的例子,比如: