我正在写一个Pig的剧本,看起来如下:
...
myGroup = group simplifiedJoinData by (dir1, dir2, dir3, dir4);
betterGroup = foreach myGroup {
value1Value2 = foreach simplifiedJoinedGroup generate value1, value2;
distinctValue1Value2 = DISTINCT value1Value2; generate group, distinctValue1Value2;
}
store betterGroup into '/myHdfsPath/myMultiStorageTest' using MyMultiStorage('output', '0', 'none' );
请注意,simplifiedjoindata的架构是simplifiedjoinedgroup:{dir1:long,dir2:long,dir3:chararray,dir4:chararray,value1:chararray,value2:chararray}
它使用一个定制的存储类(mymultistorage-基本上是piggybank中multistorage的一个修改版本)来编写多个输出文件。自定义存储类希望传递给它的值采用以下格式:
{group:(dir1:long,dir2:long,dir3:chararray,dir4:chararray), bag:{(value1:chararrary,value2:chararray)}}
我希望自定义存储类可以输出多个文件,如下所示:dir/dir2/dir3/dir4/value1\u values.txt dir/dir2/dir3/dir4/value2\u values.txt
其中value1_values.txt包含所有value1值,value2_values.txt包含所有value2值。理想情况下,我不希望编写多个稍后必须合并的部件文件(注意,为了讨论的目的,已经简化了示例)。真正的输出文件是二进制结构,不能与简单的cat组合在一起。我有这个工作的小数据集;但是,当我使用更大的数据集运行时,我会遇到这样的问题:在hadoop中,输出文件名已经存在或者已经创建了:
java.io.IOException: org.apache.hadoop.ipc.RemoteException: org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException
我怀疑这是因为多个Map器或还原器试图写入同一个文件,而我没有像pigstorage那样在文件名中使用部件id。但是,我希望通过对数据进行分组,每个dir1、dir2、dir3、dir4组合只有一条记录,因此,只有一个Map器或reducer尝试为给定的运行编写特定的文件。我尝试过对map和reduce任务都不执行推测性执行,但似乎没有效果。显然我不明白这里发生了什么。
我的问题是:为什么我得到alreadybeingcreatedexception?
如果没有办法让一个reducer为每条记录写入所有数据,那么可以接受在一个目录中写入多个部件输出文件(每个reducer一个),然后合并它们。只是不太理想。但是,到目前为止,我还无法确定让自定义存储类确定唯一文件名的正确方法,并且我仍然会遇到多个reducer试图创建/写入同一个文件。作业配置或上下文中是否有特定方法允许我协调作业中的各个部分?
提前感谢您提供的任何帮助。
1条答案
按热度按时间6ss1mwsb1#
结果发现,由于元组分析错误,我生成了相同的文件名。我得到alreadybeingcreatedexception就是因为这个原因。
自定义存储函数没有问题,或者以这种方式处理问题。我犯了个愚蠢的错误!