如何排序逗号分隔键在减速机输出?

lo8azlld  于 2021-06-02  发布在  Hadoop
关注(0)|答案(1)|浏览(231)

我正在运行一个使用mapreduce的rfm分析程序。outputkeyclass是text.class,我发出逗号分隔的r(recent)、f(frequency)、m(monetory)作为reducer的键,其中r=biginteger、f=binInteger、m=bigdecimal,值也是表示客户id的文本。我知道hadoop会根据键对输出进行排序,但我的最终结果有点奇怪。我希望输出键先按r排序,然后按f排序,然后按m排序。但由于未知原因,我得到了以下输出排序顺序:

545,1,7652    100000
545,23,390159.402343750    100001
452,13,132586    100002
452,4,32202    100004
452,1,9310    100007
452,1,4057    100018
452,3,18970    100021

但我想要以下输出:

545,23,390159.402343750    100001
545,1,7652    100000
452,13,132586    100002
452,4,32202    100004
452,3,18970    100021
452,1,9310    100007
452,1,4057    100018

注意:customer\u id是map阶段的键,属于特定customer\u id的所有rfm值都在reducer中聚集在一起进行聚合。

xwbd5t1u

xwbd5t1u1#

因此,经过大量的搜索,我找到了一些有用的资料,我现在正在发布这些资料的汇编:
必须从自定义数据类型开始。因为我有三个逗号分隔的值,需要按降序排序,所以我必须创建一个 TextQuadlet.java hadoop中的数据类型。我之所以要创建一个quadlet,是因为键的第一部分是自然键,其余三部分是r,f,m:

import java.io.*;
import org.apache.hadoop.io.*;
public class TextQuadlet implements WritableComparable<TextQuadlet> {
private String customer_id;
private long R;
private long F;
private double M;
public TextQuadlet() {
}
public TextQuadlet(String customer_id, long R, long F, double M) {
    set(customer_id, R, F, M);
}
public void set(String customer_id2, long R2, long F2, double M2) {
    this.customer_id = customer_id2;
    this.R = R2;
    this.F = F2;
    this.M=M2;
}
public String getCustomer_id() {
    return customer_id;
}
public long getR() {
    return R;
}
public long getF() {
    return F;
}
public double getM() {
    return M;
}
@Override
public void write(DataOutput out) throws IOException {
    out.writeUTF(this.customer_id);
    out.writeLong(this.R);
    out.writeLong(this.F);
    out.writeDouble(this.M);
}
@Override
public void readFields(DataInput in) throws IOException {
    this.customer_id = in.readUTF();
    this.R = in.readLong();
    this.F = in.readLong();
    this.M = in.readDouble();
}
// This hashcode function is important as it is used by the custom
// partitioner for this class.
@Override
public int hashCode() {
    return (int) (customer_id.hashCode() * 163 + R + F + M);
}
@Override
public boolean equals(Object o) {
    if (o instanceof TextQuadlet) {
        TextQuadlet tp = (TextQuadlet) o;
        return customer_id.equals(tp.customer_id) && R == (tp.R) && F==(tp.F) && M==(tp.M);
    }
    return false;
}
@Override
public String toString() {
    return customer_id + "," + R + "," + F + "," + M;
}
// LHS in the conditional statement is the current key
// RHS in the conditional statement is the previous key
// When you return a negative value, it means that you are exchanging
// the positions of current and previous key-value pair
// Returning 0 or a positive value means that you are keeping the
// order as it is
@Override
public int compareTo(TextQuadlet tp) {
// Here my natural is is customer_id and I don't even take it into
// consideration.

// So as you might have concluded, I am sorting R,F,M descendingly.
    if (this.R != tp.R) {
        if(this.R < tp.R) {
            return 1;
        }
        else{
            return -1;
        }
    }
    if (this.F != tp.F) {
        if(this.F < tp.F) {
            return 1;
        }
        else{
            return -1;
        }
    }
    if (this.M != tp.M){
        if(this.M < tp.M) {
            return 1;
        }
        else{
            return -1;
        }
    }
    return 0;
}
public static int compare(TextQuadlet tp1, TextQuadlet tp2) {
    int cmp = tp1.compareTo(tp2);
    return cmp;
}
public static int compare(Text customer_id1, Text customer_id2) {
    int cmp = customer_id1.compareTo(customer_id1);
    return cmp;
}
}

接下来,您将需要一个自定义分区器,以便具有相同键的所有值都在一个缩减器中结束:

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class FirstPartitioner_RFM extends Partitioner<TextQuadlet, Text> {
@Override
public int getPartition(TextQuadlet key, Text value, int numPartitions) {
    return (int) key.hashCode() % numPartitions;
   }
}

第三,您需要一个自定义的组比较程序,以便所有值都按其自然键(即 customer_id 而不是复合键 customer_id,R,F,M :

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class GroupComparator_RFM_N extends WritableComparator {
protected GroupComparator_RFM_N() {
    super(TextQuadlet.class, true);
}
@SuppressWarnings("rawtypes")
@Override
public int compare(WritableComparable w1, WritableComparable w2) {
    TextQuadlet ip1 = (TextQuadlet) w1;
    TextQuadlet ip2 = (TextQuadlet) w2;
    // Here we tell hadoop to group the keys by their natural key.
    return ip1.getCustomer_id().compareTo(ip2.getCustomer_id());
    }
}

第四,您将需要一个密钥比较器,它将再次根据r、f、m逐次对密钥进行排序,并实现中使用的相同排序技术 TextQuadlet.java . 因为我在编码时迷路了,所以我稍微改变了在这个函数中比较数据类型的方式,但是底层逻辑与 TextQuadlet.java :

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class KeyComparator_RFM extends WritableComparator {
protected KeyComparator_RFM() {
    super(TextQuadlet.class, true);
}
@SuppressWarnings("rawtypes")
@Override
public int compare(WritableComparable w1, WritableComparable w2) {
    TextQuadlet ip1 = (TextQuadlet) w1;
    TextQuadlet ip2 = (TextQuadlet) w2;
    // LHS in the conditional statement is the current key-value pair
    // RHS in the conditional statement is the previous key-value pair
    // When you return a negative value, it means that you are exchanging
    // the positions of current and previous key-value pair
    // If you are comparing strings, the string which ends up as the argument
    // for the `compareTo` method turns out to be the previous key and the
    // string which is invoking the `compareTo` method turns out to be the
    // current key.
    if(ip1.getR() == ip2.getR()){
        if(ip1.getF() == ip2.getF()){
            if(ip1.getM() == ip2.getM()){
                return 0;
            }
            else{
                if(ip1.getM() < ip2.getM())
                    return 1;
                else
                    return -1;
            }
        }
        else{
            if(ip1.getF() < ip2.getF())
                return 1;
            else
                return -1;
        }
    }
    else{
        if(ip1.getR() < ip2.getR())
            return 1;
        else
            return -1;
        }
    }
}

最后,在驱动程序类中,必须包含我们的自定义类。这是我用过的 TextQuadlet,Text 作为k-v对。但你可以根据自己的需要选择其他课程

job.setPartitionerClass(FirstPartitioner_RFM.class);
job.setSortComparatorClass(KeyComparator_RFM.class);
job.setGroupingComparatorClass(GroupComparator_RFM_N.class);
job.setMapOutputKeyClass(TextQuadlet.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(TextQuadlet.class);
job.setOutputValueClass(Text.class);

请纠正我,如果我在技术上出错的地方,在代码或解释,因为我已经根据这个答案纯粹是我个人的理解,从我在互联网上读到的,它对我的作品完美。

相关问题