我正在使用spark-sql-2.4.1v在poc中进行流式处理。
我有一个场景,需要从hdfs/cassandra表中加载以前的数据,并将流内容与之进行比较,即加入非流Dataframe和流Dataframe。
如果发现内容,我需要更新hdfs/cassandra表中的数据,否则就向表/hdfs添加新行。
下一次相同的流(以上添加)的内容来,我需要再次更新记录。。。
所以我的问题是,当我从hdfs/cassandra表中加载dataframe时,它最初不会包含最近添加的数据。
那么,如何在每次处理新的一批流时从hdfs/cassandra表中刷新加载的Dataframe呢?一般来说,处理这种情况的做法是什么?
谢谢您。
1条答案
按热度按时间0s0u357o1#
免责声明:我只能说Cassandra,而不是hdfs。
基本上,在cassandra中,insert和update之间没有区别—您可以发布
UPDATE
命令,即使具有相应主键的记录不存在,也可以执行INSERT
对于已经存在的行-在这两种情况下,数据都将设置为给定值。谈到刷新来自cassandra的数据-对于这种情况,spark cassandra connector提供了2个功能:
joinWithCassandra
&leftJoinWithCassandra
-它们都只在RDD上工作。第一个函数接受rdd&返回一个新的rdd对,rdd对由左侧的原始数据和右侧的cassandra数据组成-如果没有行与原始数据对应,它将不会包含在结果中。第二个函数类似,但即使cassandra中没有行,也会将原始数据保留在rdd中—在这种情况下,右侧部分将为null。这两个函数通常用于执行有效的数据查找,例如,用附加信息丰富来自流的数据。在dse分析中,还支持针对cassandra的Dataframe连接,即所谓的“dse直接连接”-它比保存cassandra所有数据的spark连接到Dataframe更有效,因为它只从数据库中提取必要的记录,而不读取所有数据。