在Java Spark中,我有一个包含'bucket_timestamp'列的框架,它表示行所属的存储桶的时间。
我想把框架写在Cassandra DB上。必须使用TTL将数据写入DB。TTL应该取决于存储桶时间戳-其中每行的TTL应该计算为ROW_TTL = CONST_TTL - (CurrentTime - bucket_timestamp)
,其中CONST_TTL
是我配置的常量TTL。
目前,我正在使用一个常量TTL用spark写Cassandra,代码如下:
df.write().format("org.apache.spark.sql.cassandra")
.options(new HashMap<String, String>() {
{
put("keyspace", "key_space_name");
put("table, "table_name");
put("spark.cassandra.output.ttl, Long.toString(CONST_TTL)); // Should be depended on bucket_timestamp column
}
}).mode(SaveMode.Overwrite).save();
我想到的一种可能的方法是-对于每个可能的bucket_timestamp -根据时间戳过滤数据,计算TTL并将过滤后的数据写入Cassandra。但这看起来效率很低,而且不是Spark的方式。在Java Spark中是否有一种方法可以提供一个spark列作为TTL选项,以便TTL对于每行都不同?
解决方案应该使用Java和数据集< Row>:我遇到了一些在scala中使用RDD执行此操作的解决方案,但没有找到使用Java和JavaScript的解决方案。
谢谢你,谢谢
2条答案
按热度按时间6ojccjat1#
更新:自2020年5月发布的Spark Cassandra Connector 3.0以来,DataFrame API中对该功能的支持已经可用。
老答案:
DataFrame API不支持这种功能,但...有JIRA为它-https://datastax-oss.atlassian.net/browse/SPARKC-416,你可以看它得到通知时,它的实施.
所以你唯一的选择就是使用RDD API,正如@bartosz25的回答中所描述的那样。
uelo1irk2#
从Spark-Cassandra连接器选项(https://github.com/datastax/spark-cassandra-connector/blob/v2.3.0/spark-cassandra-connector/src/main/java/com/datastax/spark/connector/japi/RDDAndDStreamCommonJavaFunctions.java)中,您可以将TTL设置为:
withConstantTTL
)withAutoTTL
)withPerRowTTL
)在您的情况下,您可以尝试最后一个选项,并使用您在问题中提供的规则计算TTL作为起始
Dataset
的新列。对于用例,你可以在这里看到测试:https://github.com/datastax/spark-cassandra-connector/blob/master/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/writer/TableWriterSpec.scala#L612