我的使用案例:
1.从cassandra表创建 Dataframe 。
1.通过对列进行筛选并修改该列的值来创建输出 Dataframe 。
1.将输出 Dataframe 写入cassandra,并设置TTL,以便在短时间(2秒)后删除所有修改的记录
1.将输出 Dataframe 返回给调用者,调用者在一段时间后将其写入文件系统。我只能将一个 Dataframe 返回给调用者,并且我没有进一步的控制权。另外,我不能增加TTL。
在执行步骤4时,输出 Dataframe 为空,这是因为spark重新评估了操作上的 Dataframe ,并且由于沿袭,再次执行了cassandra查询,现在没有产生任何记录。
为了避免这种情况,我在步骤2之后添加了一个步骤:
2a)outputDataframe.cache()
这确保了在第5步中,cassandra不会被查询,并且我也在我的文件中得到了所需的输出记录。我对这种方法有以下查询:
1.在spark没有找到缓存数据(缓存查找失败)的情况下,它是否有可能向上遍历世系并运行cassandra查询?如果有,在所有情况下如何避免这种情况?
1.我看到了另一种缓存方式:df.rdd.cache()
。这与在 Dataframe 上调用cache()
有什么不同吗?
作为参考,我当前的代码如下所示:
//1
val dfOrig = spark
.read
.format("org.apache.spark.sql.cassandra")
.options(Map("keyspace" -> "myks", "table" -> "mytable", "pushdown" -> "true"))
.load()
//2
val df = dfOrig.filter("del_flag = 'N'").withColumn("del_flag", lit("Y"))
//3
df.write.format("org.apache.spark.sql.cassandra")
.options(Map("keyspace" -> "myks", "table" -> "mytable", "spark.cassandra.output.ttl" -> "120"))
.mode("append")
.save()
//4
// <After quite some processing, mostly after the TTL, and in the calling code>
df.write.format("csv").save("some.csv")
1条答案
按热度按时间lf3rwulv1#
在Spark没有找到缓存数据(缓存查找失败)的情况下,它是否有可能沿着世系向上运行Cassandra查询?
可以。缓存的数据可以被该高速缓存清除器删除(主要在
MEMORY_ONLY
模式下),也可以在相应的节点被取消使用(崩溃、抢占、通过动态分配释放)时丢失。此外,其他选项(如推测执行)也会影响缓存行为。最后,数据可能没有完全缓存在第一位。
如果是,在所有情况下如何避免这种情况?
如果你需要很强的一致性保证,不要使用
cache
/persist
--它的设计并没有考虑到像这样的用例,而是将数据导出到一个持久、可靠的存储(比如HDFS),然后从那里读取。您还可以将
checkpoint
与HDFScheckpointDir
配合使用。您可能会尝试使用更可靠的缓存模式,如
MEMORY_AND_DISK_2
-这可能会降低重新计算数据的可能性,代价是这和在 Dataframe 上调用cache()有什么不同吗?
这是不同的(主要的区别是序列化策略),但当涉及到这个问题范围内感兴趣的属性时就不同了。
重要提示:
请注意,高速缓存行为可能不是代码中最大的问题。从单个表阅读和追加到单个表可能会导致复杂管道中各种不需要的或未定义的行为,除非采取额外的步骤来确保读取器不会选取新写入的记录。