pig中的计算统计模式

camsedfj  于 2021-06-03  发布在  Hadoop
关注(0)|答案(3)|浏览(367)

如果不使用自定义项,我如何计算apachepig中数据集的统计模式?

A,20
A,10
A,10
B,40
B,40
B,20
B,10

data = LOAD 'myData.txt' USING PigStorage(',') AS key, value;
byKey = GROUP data BY key;
mode = FOREACH byKey GENERATE MODE(data.value);  -- How to define MODE() ??
DUMP mode;
-- Correct answer:  (A, 10), (B, 40)
qv7cva1a

qv7cva1a1#

为用户定义的函数编写求值函数 MODE .
一般来说,pig的求值函数数量非常有限,当您开始做比min/max/count更复杂的事情时,您需要熟悉编写udf。

qlckcl4x

qlckcl4x2#

我有一个简单的udf来计算这里的模式(它使用apachecommons-math3,pig0.10.0):

public class MODE extends EvalFunc<DataBag> {
    TupleFactory mTupleFactory = TupleFactory.getInstance();
    BagFactory mBagFactory = BagFactory.getInstance();

    public DataBag exec(Tuple inputTuple) throws IOException {
        if (inputTuple == null || inputTuple.size() == 0) {
            return null;
        }
        try {
            Frequency frequency = new Frequency();
            DataBag output = mBagFactory.newDefaultBag();
            DataBag values = (DataBag) inputTuple.get(0);
            for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
                Tuple tuple = it.next();
                frequency.addValue((Long) tuple.get(0));
            }
            Set<Long> setOfMostFrequentVals = new HashSet<Long>();
            Long greatestFrequency = 0l;
            for (Iterator<Comparable<?>> it = frequency.valuesIterator(); it.hasNext();) {
                Long val = (Long) it.next();
                if (frequency.getCount(val) >= greatestFrequency) {
                    if (frequency.getCount(val) > greatestFrequency) {
                        setOfMostFrequentVals.clear();
                        greatestFrequency = frequency.getCount(val);
                    }
                    setOfMostFrequentVals.add(val);
                }
            }
            for (Long mostFequentVal : setOfMostFrequentVals) {
            output.add(mTupleFactory.newTuple(mostFequentVal));
        }
    return output;
        } catch (Exception e) {
            int errCode = 2106;
            String msg = "Error while computing mode in " + this.getClass().getSimpleName();
            throw new ExecException(msg, errCode, PigException.BUG, e);
        }
    }
}
mm5n2pyu

mm5n2pyu3#

以下是每个键只能找到一个结果的版本:

data = LOAD 'mode_data.dat' USING PigStorage(',') AS (key, value);
byKeyValue = GROUP data BY (key, value); 
cntKeyValue = FOREACH byKeyValue GENERATE FLATTEN(group) AS (key, value), COUNT(data) as cnt;
byKey = GROUP cntKeyValue BY key;
mode = FOREACH byKey {
    freq = ORDER cntKeyValue BY cnt DESC;
    topFreq = LIMIT freq 1; -- one of the most frequent values for key of the group
    GENERATE FLATTEN(topFreq.(key, value));
};

此版本将为同一个键找到所有相同频率的值:

data = LOAD 'mode_data.dat' USING PigStorage(',') AS (key, value);
byKeyValue = GROUP data BY (key, value);
cntKeyValue = FOREACH byKeyValue GENERATE FLATTEN(group) AS (key, value), COUNT(data) as cnt;
byKey = GROUP cntKeyValue BY key;
mostFreqCnt = FOREACH byKey { -- calculate the biggest count for each key
    freq = ORDER cntKeyValue BY cnt DESC;
    topFreq = LIMIT freq 1;
    GENERATE FLATTEN(topFreq.(key, cnt)) as (key, cnt);
};

modeAll = COGROUP cntKeyValue BY (key, cnt), mostFreqCnt BY (key, cnt); -- get all values with the same count and same key, used cogroup as next command was throwing some errors during execution
mode = FOREACH (FILTER modeAll BY not IsEmpty(mostFreqCnt)) GENERATE FLATTEN(cntKeyValue.(key, value)) as (key, value);

相关问题