如何在hadoop pig中按顺序连接关系?

lztngnrs  于 2021-06-03  发布在  Hadoop
关注(0)|答案(2)|浏览(313)

我有这样一行数据:

a\tb1,b2,..,bn\tc1,c2,..,cn

其中n是不确定的。现在,我想把它转换成这样的线条:

a\tb1\tc1
a\tb2\tc2
...
a\tbn\tcn

有没有可能用pig拉丁语,或者必须用udf?如果使用脚本:

A = LOAD 'file' AS (a, b, c);
B = FOREACH A GENERATE a, FLATTEN(TOKENIZE(b)), FLATTEN(TOKENIZE(c));
dump B;

我将得到如下结果:

a\tb1\tc1
a\tb1\tc2
..
a\tb1\tcn
a\tb2\tc1
a\tb2\tc2
..
a\tb2\tcn
..

这不是我想要的数据。有人有主意吗?

yzuktlbb

yzuktlbb1#

在我看来,太多使用pig的人不愿意编写自定义项。在你的例子中,你需要做的自定义项是相当简单的。下面是示例代码(未测试)

public class InSequenceJoin extends EvalFunc<DataBag>
{
    public DataBag exec(Tuple input) throws IOException {
        String b = (String) input.get(0);
        String c = (String) input.get(1);
        String[] bArray = b.split(",");
        String[] cArray = c.split(",");
        DataBag bag = BagFactory.getInstance().newDefaultBag();
        for (int i = 0; i < bArray.length && i < cArray.length; i++) {
            Tuple tuple = TupleFactory.getInstance.newTuple(2);
            tuple.set(0, bArray[i]);
            tuple.set(1, cArray[i]);
            bag.add(tuple);
        }
        return bag;
    }
}

define InSequenceJoin mysourcepath.InSequenceJoin();
A = LOAD 'file' AS (a, b, c);
B = FOREACH A GENERATE a, FLATTEN(InSequenceJoin(b,c));
dump B;

如果需要,可以在udf中添加数组大小是否匹配的验证。您可以将我在示例中使用的字符串拆分替换为您真正需要的任何内容。

a9wyjsp7

a9wyjsp72#

我会尝试使用datafu的包自定义项。
按您所做的方式加载数据,然后使用enumerate枚举包元素,然后展平(如您所见,这将提供包元素之间的交叉连接),然后可以根据添加到包元素的索引进行筛选。
请看这里:https://github.com/linkedin/datafu

相关问题