我有一个有边界的pcollection,并希望使用动态文件命名方案将输出持久化到s3 bucket。不幸的是,在emr(尝试过emr-6.2.0和emr-5.30.1)和spark runner上运行时,在管道中的所有步骤完成并且集群终止之后,s3上的输出在主输出目录中只包含一些预期的内容,尽管大部分内容放在.temp beam文件夹中,并且从未移动到主输出目录。其中大部分=90%的预期行未保留在正确命名的文件中,抽查表明预期行位于.temp beam文件夹中的文件中。以下是相关的管道声明部分:
PCollection<SomeObject> input; // is a bounded PCollection
FileIO.Write<String, SomeObject> write = FileIO.<String, SomeObject>writeDynamic()
.by(SomeObject::key)
.withDestinationCoder(StringUtf8Coder.of())
.withCompression(Compression.GZIP)
.withNaming((SerializableFunction<String, FileIO.Write.FileNaming>) key
-> (FileIO.Write.FileNaming) (window, pane, numShards, shardIndex, compression)
-> String.format("some_object_%s_%d.csv.gz", key, shardIndex))
.via(Contextful.fn(SomeObject::toCsvLine), Contextful.fn(x -> TextIO.sink().withHeader(SomeObject.HEADER)))
.to("s3://some-bucket/some-output-path");
input.apply("write-a-pcollection", write);
通过这个代码,我得到了一个s3 bucket,它看起来像:
一些对象\u key1 \u 0.csv.gz
一些对象\u key1 \u 1.csv.gz
一些对象\u key2 \u 0.csv.gz
一些对象\u key3 \u 0.csv.gz
.temp beam-,其中90%的预期内容保留在使用随机uuid命名的对象中
但是,当我加上 .withIgnoreWindowing()
对于writedynamic配置,输出似乎是完全正确的,并且没有留下.temp beam目录。但是这个方法已经被弃用了,javadocs中也没有提供任何替代方法(至少我找不到任何替代方法)。
为什么本文作者需要忽略窗口才能在这种情况下正常工作,以及管道应该是什么样子才能不使用它 .withIgnoreWindowing()
?
更新:
尝试在应用writedynamic之前添加全局窗口步骤(如注解中所示),但没有成功:
input.apply("wnd", Window.<SomeObject>into(new GlobalWindows())
.triggering(DefaultTrigger.of()))
暂无答案!
目前还没有任何答案,快来回答吧!