解决:我解决了这个问题,这是由于在流程的第一个通道中的一个非常愚蠢、愚蠢、愚蠢的错误。基本上,我是在计算一个写进配置单元表的Dataframe;然后需要使用这个Dataframe来创建 temporaryDF
在许多段落之后,我最初是从头开始查询表,而不是使用Dataframe的副本写入表中。错误在于,刚刚计算的Dataframe丢失了之前的分区(由于流的特定逻辑),而下一次的计算需要创建 temporaryDF
还需要至少两个以前的分区。我不知道为什么,我不记得什么时候,我决定缓存刚刚计算的一个,从而丢失信息并在oozie下得到一个空分区(在spark shell中,我总是使用至少三个分区,因为一段时间后手动更新表-每个新分区每15分钟出现一次)。我可能是在深夜工作冲刺和我的大脑认为这是值得搞砸了。
我投了赞成票,接受了“蓝色幻影”的回答,因为他在我描述的具体情况下是非常正确的。
原文:我有一个奇怪的行为使用 Spark-Submit
hadoop2中的sparkv.2.2.0.2.6.4.105-1(使用scala)在oozie工作流下与使用 Spark-Shell
.
我有一个配置单元表,其中包含每15分钟跟踪一次某些进程的记录。每次新记录或“旧”记录仍满足感兴趣进程的逻辑时,表都会被覆盖。
我通过一个专栏来记录这些记录的年代 times_investigated
,范围从1到9。
我创建了一个临时Dataframe,我们称之为 temporayDF
,它同时包含旧条目和新条目(两种类型都需要存在才能运行有用的计算)。这个 temporayDF
然后根据 $"times_investigated" === 1
以及 $"times_investigated > 1"
(或 =!= 1
). 然后,处理后的条目与 union
在最后的Dataframe中,然后将其写入原始配置单元表。
// Before, I run the query on the 'old' Hive table and the logic over old and new entries.
// I now have a temporary dataframe
val temporaryDF = previousOtherDF
.withColumn("original_col_new", conditions)
.withColumn("original_other_col_new", otherConditions)
.withColumn("times_investigated_new", nvl($"times_investigated" + 1, 1))
.select(
previousColumns,
$"original_col_new".as("original_col"),
$"original_other_col_new".as("original_other_col"),
$"times_investigated_new".as("times_investigated"))
.cache
// Now I need to split the temporayDF in 2 to run some other logic on the new entries.
val newEntriesDF = temporaryDF
.filter($"times_investigated" === 1)
.join(neededDF, conditions, "leftouter")
.join(otherNeededDF, conditions, "leftouter")
.groupBy(cols)
.agg(min(colOne),
max(colTwo),
min(colThree),
max(colFour))
.withColumn("original_col_five_new",
when(conditions).otherwise(somethingElse))
.withColumn("original_col_six_new",
when(conditions).otherwise(somethingElse))
.select(orderedColumns)
val oldEntriesDF = temporaryDF.filter($"times_investigated" > 1)
val finalTableDF = oldEntriesDF.union(newEntriesDF)
// Now I write the table
finalTableDF.createOrReplaceTempView(tempFinalTableDF)
sql("""INSERT OVERWRITE TABLE $finalTableDF
SELECT * FROM tempFinalTableDF """)
// I would then need to re-use the newly-computed table to process further information...
问题是:配置单元表没有为新条目提供 times_investigated
= 1. 它只处理旧的条目,因此,在一个条目可以停留在表中的9次之后,它就完全空了。
我在sparkshell中运行了一些测试,在许多次迭代中,一切都运行得很好,甚至从shell手动编写配置单元表也在配置单元表中产生了预期的结果,但是当我在oozie下启动工作流时,奇怪的行为又出现了。
我在sparkshell中注意到,在编写了Hive表之后,如果我去计算 temporaryDF.show()
,新条目将更新为 $"times_investigated"
= 2!
我试着复制 temporaryDF
使用新条目和旧条目处理不同的Dataframe copyOfTemporaryDF
写入配置单元表后更新。
似乎这种重新计算是在oozie下编写配置单元表之前发生的。
我知道我可以用一种不同的方式来计算操作,但是如果可能的话,我需要在当前流上找到一个快速的临时解决方案。
最重要的是,我很想了解引擎盖下正在发生的事情,以避免自己以后陷入这种情况。
你们有什么线索和/或建议吗?
我尝试缓存中间Dataframe,但没有成功。
p、 很抱歉可能是糟糕的编码实践
编辑。更多背景: temporaryDF
来自其他中间Dataframe,仅用于一次计算感兴趣的Dataframe。最后一段 temporaryDF
是 withColumn
操作,在哪里 $"times_investigated"
使用自定义 nvl
函数(其工作方式与sql函数完全相同),并且在旧版本的流中从未出现问题(参见下面的段落)。
edit2:我还尝试在一个长链序列中合并对新条目和旧条目的操作,以便 temopraryDF
实际上是要写入配置单元表中的最终Dataframe,但是 times_investigated
=1仍然没有考虑(但是我没有问题) Spark-Shell
以及 .show
将Dataframe写入表后进行重新计算,因此所调查的时间为+1)。
1条答案
按热度按时间nvbavucw1#
使用.cache,否则将得到重新计算。如果rdd或df要在单Spark应用程序中多次使用,那么您应该为适当的Dataframe或rdd执行此操作—甚至不依赖于操作,有时您会得到“跳过的阶段”。
2 VAL使用temporarydf,而不缓存重新计算,正如您所看到的,它们可能会给出不同的结果。应该缓存的。
当然,如果一个工作进程死亡,或者分区被逐出,则需要重新计算。
.cache对于大于可用群集内存的数据集可能不理想。被逐出的每个分区都将从源代码处重建,这是一件代价高昂的事情。
另外,使用适当的分区和迭代几次可能比持久化/缓存更好;但这要视情况而定。