我正在处理hadoop中的一个输入日志文件,其中密钥分布不均匀。这意味着减速器的值分布不均匀。例如,key1有1个值,key2有1000个值。有没有办法对与同一个键相关联的值进行负载平衡[我不想同时修改我的键]
eulz3vhy1#
如果您知道哪些键的值将非常大,那么可以使用以下技巧。您可以实现自定义 Partitioner 这将确保每个倾斜的密钥都被分配到一个分区,然后其他所有的密钥都将被分配到剩余的分区 hashCode (默认值是多少 HashPartitioner 是的)。您可以创建自定义 Partitioner 通过实现此接口:
Partitioner
hashCode
HashPartitioner
public interface Partitioner<K, V> extends JobConfigurable { int getPartition(K key, V value, int numPartitions); }
然后告诉hadoop使用 Partitioner 使用:
conf.setPartitionerClass(CustomPartitioner.class);
t1qtbnec2#
也许你可以在打减速器之前用一个组合器?这是相当投机的。。。其思想是将每组密钥划分为预设最大大小的分区,然后将这些分区的k/v对输出到reducer。此代码假定您在配置中的某个位置设置了该大小。
public static class myCombiner extends Reducer<Text, Text, Text, Text> { public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { List<Text> textList = new ArrayList<Text>(); int part = 0; while (values.iterator().hasNext()) { if (textList.size() <= Integer.parseInt(context.getConfiguration().get("yourMaxSize"))) { textList.add(values.iterator().next()); } else { for(Text t : textList) { //essentially partitioning each key... context.write(new Text(key.toString() + "_" + Integer.toString(part)), t); } textList.clear(); } part += 1; } //output any stragglers ... for(Text t : textList) { context.write(new Text(key.toString() + "_" + Integer.toString(part)), t); } } }
2条答案
按热度按时间eulz3vhy1#
如果您知道哪些键的值将非常大,那么可以使用以下技巧。
您可以实现自定义
Partitioner
这将确保每个倾斜的密钥都被分配到一个分区,然后其他所有的密钥都将被分配到剩余的分区hashCode
(默认值是多少HashPartitioner
是的)。您可以创建自定义
Partitioner
通过实现此接口:然后告诉hadoop使用
Partitioner
使用:t1qtbnec2#
也许你可以在打减速器之前用一个组合器?这是相当投机的。。。
其思想是将每组密钥划分为预设最大大小的分区,然后将这些分区的k/v对输出到reducer。此代码假定您在配置中的某个位置设置了该大小。