Spark以不同的TTL写入Cassandra

quhf5bfb  于 2023-10-18  发布在  Cassandra
关注(0)|答案(2)|浏览(106)

在Java Spark中,我有一个包含'bucket_timestamp'列的框架,它表示行所属的存储桶的时间。
我想把框架写在Cassandra DB上。必须使用TTL将数据写入DB。TTL应该取决于存储桶时间戳-其中每行的TTL应该计算为ROW_TTL = CONST_TTL - (CurrentTime - bucket_timestamp),其中CONST_TTL是我配置的常量TTL。
目前,我正在使用一个常量TTL用spark写Cassandra,代码如下:

df.write().format("org.apache.spark.sql.cassandra")
            .options(new HashMap<String, String>() {
                {
                    put("keyspace", "key_space_name");
                    put("table, "table_name");
                    put("spark.cassandra.output.ttl, Long.toString(CONST_TTL)); // Should be depended on bucket_timestamp column
                }
            }).mode(SaveMode.Overwrite).save();

我想到的一种可能的方法是-对于每个可能的bucket_timestamp -根据时间戳过滤数据,计算TTL并将过滤后的数据写入Cassandra。但这看起来效率很低,而且不是Spark的方式。在Java Spark中是否有一种方法可以提供一个spark列作为TTL选项,以便TTL对于每行都不同?
解决方案应该使用Java和数据集< Row>:我遇到了一些在scala中使用RDD执行此操作的解决方案,但没有找到使用Java和JavaScript的解决方案。
谢谢你,谢谢

6ojccjat

6ojccjat1#

更新:自2020年5月发布的Spark Cassandra Connector 3.0以来,DataFrame API中对该功能的支持已经可用。
老答案:
DataFrame API不支持这种功能,但...有JIRA为它-https://datastax-oss.atlassian.net/browse/SPARKC-416,你可以看它得到通知时,它的实施.
所以你唯一的选择就是使用RDD API,正如@bartosz25的回答中所描述的那样。

uelo1irk

uelo1irk2#

从Spark-Cassandra连接器选项(https://github.com/datastax/spark-cassandra-connector/blob/v2.3.0/spark-cassandra-connector/src/main/java/com/datastax/spark/connector/japi/RDDAndDStreamCommonJavaFunctions.java)中,您可以将TTL设置为:

  • 常数值(withConstantTTL
  • 自动解析值(withAutoTTL
  • 基于列的值(withPerRowTTL

在您的情况下,您可以尝试最后一个选项,并使用您在问题中提供的规则计算TTL作为起始Dataset的新列。
对于用例,你可以在这里看到测试:https://github.com/datastax/spark-cassandra-connector/blob/master/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/writer/TableWriterSpec.scala#L612

相关问题