我尝试为rdd到hdfs中的每条记录编写多个文件(每个黑名单一个,按一个键分组),每个文件集上都应用了一个黑名单。
首先,我将multipletextoutputformat与keyby结合使用,按照记录中的一个字段对输出文件进行分组,效果很好。因此,我的输出文件现在由一个键命名,来自记录,记录被分组在这个文件中。
但我现在的问题是,我需要对输出应用黑名单,并分别保存每个输出。我用了一个简单的过滤器。现在发生的情况是,应用这个文件管理器会导致工作被执行x次,针对x个不同的黑名单。对于大量的记录来说,这是不可接受的。即使以前调用Dataframe上的缓存函数。
为了明确我想要输入的是一个巨大的分布式hdfs文件,其结构如下:
ID, Name, .
我的输出目录应该如下所示:
blacklisted1/ID1.file -> Content:(ID, Name, ...)
/ID2.file
blacklisted2/ID1.file
/ID2.file
目标是只读取每条记录一次,然后为其写入这些文件集。我目前的方法是使用foreachpartition,并通过迭代记录在hdfs中手动创建一个文件,但对我来说,必须有一个更简单的解决方案。
1条答案
按热度按时间kxkpmulp1#
因为您的密钥不能保证在一个黑名单中,所以我要做的是生成一个flatmap,在这里您将(key,value)rdd更改为(blacklist key,value),其中数据可能重复,然后使用multipletextoutputformat根据“blacklist/key”写入文件路径。类似于此:
然后您可以在rdd的一个过程中生成输出。假设这个rdd比原来的要大,这个工作流似乎更适合spark的优化。