for数据包中的每个元组从try块一次又一次地执行

lmvvr0a8  于 2021-05-29  发布在  Hadoop
关注(0)|答案(2)|浏览(315)

代码如下:

public class databag extends EvalFunc<DataBag> {
TupleFactory mTupleFactory = TupleFactory.getInstance();
BagFactory mBagFactory = BagFactory.getInstance();

private DataBag result;
private String delimiterType = ": Src / dest :";
public DataBag exec(Tuple input) throws IOException {

    try{
        result = mBagFactory.newDefaultBag(); // change here
        result.add(input);

        getLogger().info("::::::: Entered try block ::::::::::::");

        // create indexing for source and destination . ::: (Arraylist<Object[]>)
        ConcurrentHashMap<Object, ArrayList<Integer>> srcIndexMap =  new ConcurrentHashMap<Object, ArrayList<Integer>>();
        ConcurrentHashMap<Object, ArrayList<Integer>> destIndexMap = new ConcurrentHashMap<Object, ArrayList<Integer>>();

        // store the rows to Arraylist(Object[]) collection by converting . 
        ArrayList<Object[]> source = new ArrayList<Object[]>();
        ArrayList<Object[]> destination = new ArrayList<Object[]>();

        int srcCounter = 0;
        int destCounter = 0;

    ArrayList<Integer> Sourcearray = new ArrayList<Integer>();
        ArrayList<Integer> Destinationarray = new ArrayList<Integer>(); 
for (Iterator<Tuple> iter = result.iterator(); iter.hasNext();) {
//some code here
}

我尝试使用for循环迭代数据包中的元组,但是对于每个元组,所有集合都被重新初始化,换句话说,它从每个元组的try块执行。
输出:

INFO  PigUDFpck.databag - ::::::: Entered try block ::::::::::::
PigUDFpck.databag - srcIndexMap={}
PigUDFpck.databag - inside main if loop skey=4
PigUDFpck.databag - destIndexMap.contains(skey)=false
PigUDFpck.databag - into else loop of main method
PigUDFpck.databag - ::::::: Entered try block ::::::::::::
PigUDFpck.databag - srcIndexMap={}
PigUDFpck.databag - inside main if loop skey=4
PigUDFpck.databag - destIndexMap.contains(skey)=false
PigUDFpck.databag - into else loop of main method

更新
Pig手稿

REGISTER /usr/local/pig/UDF/UDFBAG.jar;

sourcenew = LOAD 'hdfs://HADOOPMASTER:54310/DVTTest/Source1.txt' USING PigStorage(',') as (ID:int,Name:chararray,FirstName:chararray ,LastName:chararray,Vertical_Name:chararray ,Vertical_ID:chararray,Gender:chararray,DOB:chararray,Degree_Percentage:chararray ,Salary:chararray,StateName:chararray);

destnew = LOAD 'hdfs://HADOOPMASTER:54310/DVTTest/Destination1.txt' USING PigStorage(',') as (ID:int,Name:chararray,FirstName:chararray ,LastName:chararray,Vertical_Name:chararray ,Vertical_ID:chararray,Gender:chararray,DOB:chararray,Degree_Percentage:chararray ,Salary:chararray,StateName:chararray);

cogroupnew = COGROUP sourcenew BY ID inner, destnew BY ID inner;

diff_data = FOREACH cogroupnew GENERATE DIFF(sourcenew,destnew);

ids = FOREACH diff_data GENERATE FLATTEN($0);

id1 = DISTINCT( FOREACH ids GENERATE $0);

src = FILTER sourcenew BY ID == id1.$0;

finalsrc = FOREACH src GENERATE *, 'Source' as Source:chararray;

dest = FILTER destnew BY ID == id1.$0;

finaldest = FOREACH dest GENERATE *, 'Destination' as Destination:chararray;

final =  UNION finalsrc,finaldest ;

A = FOREACH final GENERATE PigUDFpck.databag(*);

DUMP A;

udf的输入如下:

(4,JOHN Hansel,JOHN,Hansel,Banking ,4,M,20-01-1994,78.65,345000,ArkansasSrc1,Source)

(4,JOHN Hansel,JOHN,Hansel,Banking ,4,M,20-01-1994,78.65,345000,ArkansaSrc2,Source)

(4,JOHN Hansel,JOHN,Hansel,Banking ,4,M,20-01-1994,78.65,345000,Arkansasdest1,Destination)

(4,JOHN Hansel,JOHN,Hansel,Banking ,4,M,20-01-1994,78.65,345000,Arkanssdest2,Destination)

非常感谢您的帮助。!!提前谢谢。。!

djp7away

djp7away1#

请理解pig是一个dag生成器,它基于dag生成map reduce jobs。
更高级别的pig构造,如load、foreach、join boils到较低级别的mr构造

> Load       => Mapper in MR 
> GENRERATE  => a function call in Mapper or Reduce
> JOIN       => SHUFFLE (Join in Map Reduce)
> Filter     => Filter function in Map or Reduce

在reducer的mapper中执行函数调用时,databag函数不是一次调用,而是多次调用。
对于每个输入行(取决于databag udf成为mapper或reducer的一部分),将执行databag。
请使用pig中的expain命令,该命令将pig脚本转换为底层mr jobs的链接
详细了解请参见:
http://bytepadding.com/big-data/map-reduce/pig-to-map-and-reduce/
http://bytepadding.com/big-data/map-reduce/understanding-map-reduce-the-missing-guide/

ig9co6j1

ig9co6j12#

好吧,评论有点大

...
src = FILTER sourcenew BY ID == id1.$0;

finalsrc = FOREACH src GENERATE *, 'Source' as Source:chararray;

dest = FILTER destnew BY ID == id1.$0;

finaldest = FOREACH dest GENERATE *, 'Destination' as Source:chararray;
final =  UNION finalsrc,finaldest ;
A = FOREACH (group final by ID) {
    src = filter final by Source == 'Source';
    dest = filter final by Source == 'Destination';
    GENERATE flatten(PigUDFpck.databag(src, dest));
}

在这种情况下,udf将收到一个包含两个元组的a元组,您可以对其进行比较。另外,我很确定它可以简化(我的意思是,你可以在加载后立即进行联合和分组-只需为每个生成一个标志,告诉你它是源代码还是终止代码)

相关问题