apache pig udf和outputschema定制

ht4b089n  于 2021-06-24  发布在  Pig
关注(0)|答案(2)|浏览(358)

我正在尝试实现udf函数来处理各种源/输入文件。输入文件的列数不同。我的目的是要有通用的自定义函数。每次运行pig脚本都会处理一种类型的输入文件(由“|”分隔的记录数相同)。
udf函数应该读取由分隔符(|)分隔的所有输入记录,并根据某些条件生成一个包含两个元组的包,例如输入(1,2,3,4,5,6)输出a){(1,3)、(2,4,5,6)}或b){(2,3,4)、(1,5,6)}
我无法扩展outputschema方法来处理不同大小元组的创建。无法将额外的参数传递给outputschema方法。不能使用作为evalfunc类定义的一部分定义的临时变量,因为每次运行时它的值都为null。
有什么提示吗?谢谢您
更新:
我使用grunt执行下面的命令,inputschema是在“as”之后提供的

sourceData = foreach sourceData generate com.pig.Data('test.json', *) as (t:(s:(VIN: chararray,Birthdate: chararray), n:(name: chararray,customerId: chararray,Mileage: chararray,Fuel_Consumption: chararray)));

自定义项代码在这里。。。

public Schema outputSchema(Schema input) {

(第233行)system.out.println(“

vyswwuz2

vyswwuz21#

-----”+input.getfields().size());
错误:

Pig Stack Trace
---------------
ERROR 1200: java.lang.NullPointerException

Failed to parse: java.lang.NullPointerException
        at org.apache.pig.parser.QueryParserDriver.parse(QueryParserDriver.java:201)
        at org.apache.pig.PigServer$Graph.validateQuery(PigServer.java:1707)
        at org.apache.pig.PigServer$Graph.registerQuery(PigServer.java:1680)
        at org.apache.pig.PigServer.registerQuery(PigServer.java:623)
        at org.apache.pig.tools.grunt.GruntParser.processPig(GruntParser.java:1082)
        at org.apache.pig.tools.pigscript.parser.PigScriptParser.parse(PigScriptParser.java:505)
        at org.apache.pig.tools.grunt.GruntParser.parseStopOnError(GruntParser.java:230)
        at org.apache.pig.tools.grunt.GruntParser.parseStopOnError(GruntParser.java:205)
        at org.apache.pig.tools.grunt.Grunt.run(Grunt.java:66)
        at org.apache.pig.Main.run(Main.java:565)
        at org.apache.pig.Main.main(Main.java:177)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.hadoop.util.RunJar.run(RunJar.java:221)
        at org.apache.hadoop.util.RunJar.main(RunJar.java:136)
Caused by: java.lang.RuntimeException: java.lang.NullPointerException
        at com.mortardata.pig.DataSpliter.outputSchema(DataSpliter.java:306)
        at org.apache.pig.newplan.logical.expression.UserFuncExpression.getFieldSchema(UserFuncExpression.java:244)
        at org.apache.pig.newplan.logical.optimizer.FieldSchemaResetter.execute(SchemaResetter.java:264)
        at org.apache.pig.newplan.logical.expression.AllSameExpressionVisitor.visit(AllSameExpressionVisitor.java:143)
        at org.apache.pig.newplan.logical.expression.UserFuncExpression.accept(UserFuncExpression.java:113)
        at org.apache.pig.newplan.ReverseDependencyOrderWalker.walk(ReverseDependencyOrderWalker.java:70)
        at org.apache.pig.newplan.PlanVisitor.visit(PlanVisitor.java:52)
        at org.apache.pig.newplan.logical.optimizer.SchemaResetter.visitAll(SchemaResetter.java:67)
        at org.apache.pig.newplan.logical.optimizer.SchemaResetter.visit(SchemaResetter.java:122)
        at org.apache.pig.newplan.logical.relational.LOGenerate.accept(LOGenerate.java:245)
        at org.apache.pig.newplan.DependencyOrderWalker.walk(DependencyOrderWalker.java:75)
        at org.apache.pig.newplan.logical.optimizer.SchemaResetter.visit(SchemaResetter.java:114)
        at org.apache.pig.parser.LogicalPlanBuilder.buildForeachOp(LogicalPlanBuilder.java:1055)
        at org.apache.pig.parser.LogicalPlanGenerator.foreach_clause(LogicalPlanGenerator.java:15896)
        at org.apache.pig.parser.LogicalPlanGenerator.op_clause(LogicalPlanGenerator.java:1933)

        at org.apache.pig.parser.LogicalPlanGenerator.statement(LogicalPlanGenerator.java:560)
        at org.apache.pig.parser.LogicalPlanGenerator.query(LogicalPlanGenerator.java:421)
        at org.apache.pig.parser.QueryParserDriver.parse(QueryParserDriver.java:191)
        ... 16 more
Caused by: java.lang.NullPointerException
        at com.mortardata.pig.DataSpliter.outputSchema(DataSpliter.java:233)
        ... 34 more
================================================================================

更新2:
好的,输入模式是从上一个pig命令传播的。。。
sourcedata=load'test.csv',使用pigstorage(',')作为(vin:chararray,生日:chararray,姓名:chararray,客户ID:chararray,里程:chararray,油耗:chararray);
sourcedata=foreach sourcedata generate com.pig.data'test_data_desc.json',*)as(t:(s:(vin:chararray,birthdate:chararray),n:(name:chararray,customerid:chararray,miliege:chararray,fuel\u consumption:chararray));
这是没有用的-(因为它不可能传播任何额外的属性或它不可能在outputschema方法中创建任何其他更复杂的逻辑-(

ljo96ir5

ljo96ir52#

在outputschema函数中,您可以访问输入模式,并使用输入模式信息根据输入动态生成输出模式(如果输入以某种方式反映了预期的输出)。例子:

public Schema outputSchema(Schema input) {
    Schema mySchema = new Schema();
    if (input.getFields().size() == 3) {
      mySchema.add(new Schema.FieldSchema("data1", DataType.DOUBLE));
      mySchema.add(new Schema.FieldSchema("data2", DataType.DOUBLE));
      mySchema.add(new Schema.FieldSchema("data3", DataType.DOUBLE));
    } else {
      mySchema.add(new Schema.FieldSchema("data", DataType.CHARARRAY));
    }
    return mySchema;
  }

我希望这有帮助。

相关问题