我有一个非常简单的hadoop工作,使用cassandra作为输入和输出。以下是作业配置代码(无特殊说明):
Job job = new Job(getConf(), JOB_NAME);
job.setJarByClass(getClass());
job.setMapperClass(CassandraHadoopCounterMapper.class);
job.setReducerClass(CassandraHadoopCounterReducer.class);
job.setCombinerClass(CassandraHadoopCounterCombiner.class);
job.setInputFormatClass(CqlInputFormat.class);
job.setOutputFormatClass(CqlOutputFormat.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Map.class);
job.setOutputValueClass(List.class);
ConfigHelper.setInputColumnFamily(job.getConfiguration(), KEYSPACE, INPUT_COLUMN_FAMILY, WIDE_ROWS);
ConfigHelper.setOutputColumnFamily(job.getConfiguration(), KEYSPACE, OUTPUT_COLUMN_FAMILY);
ConfigHelper.setInputRpcPort(job.getConfiguration(), "9160");
ConfigHelper.setOutputRpcPort(job.getConfiguration(), "9160");
ConfigHelper.setInputInitialAddress(job.getConfiguration(), "localhost");
ConfigHelper.setOutputInitialAddress(job.getConfiguration(), "localhost");
ConfigHelper.setInputPartitioner(job.getConfiguration(), Murmur3Partitioner.class.getName());
ConfigHelper.setOutputPartitioner(job.getConfiguration(), Murmur3Partitioner.class.getName());
String query = "UPDATE " + KEYSPACE + "." + OUTPUT_COLUMN_FAMILY + " SET c = ?";
CqlConfigHelper.setOutputCql(job.getConfiguration(), query);
//aditional properties:
CqlConfigHelper.setInputCQLPageRowSize(job.getConfiguration(), "2000");
ConfigHelper.setInputSplitSize(job.getConfiguration(), 4 * 64 * 1024);
我的输入cassandra表有10k行。在hadoop中我设置了 max mappers = 2
以及 max reducers = 2
在作业计数器中,我可以看到以下内容:
Map input records=4000
哪个是 InputCQLPageRowSize * mappers
如果 InputCQLPageRowSize
当时未设置 Map input records
等于2000(因为默认 InputCQLPageRowSize
是1000)
我的问题:如何使我的hadoop作业读取输入表中的所有行?
作业完全在我的电脑上本地运行。
我使用的是cassandrav2.0.11和hadoop v1.0.4
1条答案
按热度按时间ds97pgxw1#
我的问题与cassandra 2.0.11中的一个bug有关,该bug在底层cql查询运行中添加了一个奇怪的限制子句,以将数据读取到Map任务:
我把这个问题发到了Cassandra·吉拉:https://issues.apache.org/jira/browse/cassandra-9074
事实证明,该问题与cassandra 2.0.12中修复的以下错误密切相关:https://issues.apache.org/jira/browse/cassandra-8166