cassandra 3.7 cdc/增量数据加载

hyrbngr7  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(429)

我对etl非常陌生,我希望用cassandra3.7和spark实现增量数据加载。我知道cassandra的更高版本确实支持cdc,但我只能使用Cassandra3.7。有没有一种方法可以让我只跟踪更改的记录并使用spark加载它们,从而执行增量数据加载?
如果在Cassandra方面做不到,在spark方面也欢迎任何其他建议:)

wmomyfyw

wmomyfyw1#

这是一个相当广泛的主题,有效的解决方案将取决于表中的数据量、表结构、数据的插入/更新方式等。此外,具体的解决方案可能取决于可用的spark版本。spark-only方法的一个缺点是,如果没有前一个状态的完整副本,就很难检测到数据的删除,因此可以生成两个状态之间的差异。
在所有情况下,您都需要执行全表扫描以查找更改的条目,但如果您的表是专门为此任务组织的,则可以避免读取所有数据。例如,如果您有一个具有以下结构的表:

create table test.tbl (
  pk int,
  ts timestamp,
  v1 ...,
  v2 ...,
  primary key(pk, ts));

如果执行以下查询:

import org.apache.spark.sql.cassandra._
val data = spark.read.cassandraFormat("tbl", "test").load()
val filtered = data.filter("""ts >= cast('2019-03-10T14:41:34.373+0000' as timestamp) 
                              AND ts <= cast('2019-03-10T19:01:56.316+0000' as timestamp)""")

然后spark cassandra连接器将此查询下推到cassandra,并将只读数据 ts 在给定的时间范围内-可以通过执行 filtered.explain 并检查两个时间过滤器是否标记有 * 符号。
检测更改的另一种方法是从cassandra检索写入时间,并根据该信息过滤出更改。对于所有最新版本的scc,rdd api都支持获取writetime,自scc 2.5.0发布以来,dataframe api也支持获取writetime(至少需要spark 2.4,但也可以与2.3一起使用)。获取此信息后,可以对数据应用筛选器并提取更改。但你需要记住几件事:
使用此方法无法检测删除
写入时间信息只存在于常规列和静态列,而不存在于主键列
如果插入后对行进行了部分更新,则每列可能有自己的写入时间值
在大多数版本的Cassandra,呼吁 writetime 函数对collection列(list/map/set)执行时将生成错误,并将/可能返回 null 对于具有用户定义类型的列
p、 即使您启用了cdc,正确使用它也不是一件小事:
您需要消除重复更改-您有更改的rf副本
某些更改可能会丢失,例如,当节点关闭时,然后稍后通过提示或修复进行传播
ttl不容易处理
...
对于疾病预防控制中心,您可以查看第2019届会议的演示文稿—有几次关于这个主题的演讲。

相关问题