如何在不引起写操作的情况下计算累加器?

b1uwtaje  于 2021-05-18  发布在  Spark
关注(0)|答案(1)|浏览(366)

我想在编写Dataframe之前执行轻量级验证。在编写之前,我必须通过“foo”序列化Dataframe。我正在递增“foo”中的累加器:

acc = sc.accumulator(0)
output = df.map(foo)
if acc.value < THRESHOLD:
 raise ValueError(f"Failed validation: {acc.value} < {THRESHOLD}")
output.write(path)

问题是 acc.value == 0 因为累加器在 output.write() ,我想避免,因为数据验证失败。什么是正确的设计模式?

pexxcrt2

pexxcrt21#

如果您的目标是在将数据发布到某个输出路径之前验证计数,只需将数据写入中间路径即可。然后计算累加器计数器,如果计数有效,则将中间路径重命名为实际输出目标。

acc = sc.accumulator(0)
output = df.map(foo)
output.write(tmp_path)
if acc.value < THRESHOLD:
 # fs.delete(tmp_path)
 raise ValueError(f"Failed validation: {acc.value} < {THRESHOLD}")
else fs.rename(tmp_path, path)

相关问题