mapreduce cassandra wordcount编译错误:找不到confighelper

r1zk6ea1  于 2021-06-03  发布在  Hadoop
关注(0)|答案(1)|浏览(353)

我试图运行wordcount mapreduce程序来读取和计算存储在cassandra表(列族)中的数据,但是,当我编译程序时,重复出现了相同的错误。下面是我的源代码和错误我得到。有人能帮我解决这个问题吗?提前谢谢。

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.*;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import org.apache.cassandra.db.IColumn;
import org.apache.cassandra.hadoop.*;
import org.apache.cassandra.hadoop.ColumnFamilyInputFormat;
import org.apache.cassandra.hadoop.ConfigHelper;
import org.apache.cassandra.thrift.*;
import org.apache.cassandra.utils.ByteBufferUtil;

/**
 * This sums the word count stored in the input_words_count ColumnFamily for the key "key-if-verse1".
 *
 * Output is written to a text file.
 */
public class WordCountCounters extends Configured implements Tool
{
   private static final Logger logger = LoggerFactory.getLogger(WordCountCounters.class);

     static final String COUNTER_COLUMN_FAMILY = "input_words";
     private static final String OUTPUT_PATH_PREFIX = "/Users/Deepu/Documents/dse-3.2.4/dse-data/word_count_counters";

     public static void main(String[] args) throws Exception
    {
     // Let ToolRunner handle generic command-line options
     ToolRunner.run(new Configuration(), new WordCountCounters(), args);
     System.exit(0);
    }

public static class SumMapper extends Mapper<ByteBuffer, SortedMap<ByteBuffer, IColumn>, Text, LongWritable>
{
    public void map(ByteBuffer key, SortedMap<ByteBuffer, IColumn> columns, Context context) throws IOException, InterruptedException
    {
        long sum = 0;
        for (IColumn column : columns.values())
        {
            logger.debug("read " + key + ":" + column.name() + " from " + context.getInputSplit());
            sum += ByteBufferUtil.toLong(column.value());
        }
        context.write(new Text(ByteBufferUtil.string(key)), new LongWritable(sum));
    }
}

public int run(String[] args) throws Exception
{
    Job job = new Job(getConf(), "wordcountcounters");
    job.setJarByClass(WordCountCounters.class);
    job.setMapperClass(SumMapper.class);

    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(LongWritable.class);
    FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH_PREFIX));

    job.setInputFormatClass(ColumnFamilyInputFormat.class);

    ConfigHelper.setRpcPort(job.getConfiguration(), "9160");
    ConfigHelper.setInitialAddress(job.getConfiguration(), "localhost");
    ConfigHelper.setPartitioner(job.getConfiguration(), "org.apache.cassandra.dht.RandomPartitioner");
    ConfigHelper.setInputColumnFamily(job.getConfiguration(), WordCount.KEYSPACE, WordCountCounters.COUNTER_COLUMN_FAMILY);
    SlicePredicate predicate = new SlicePredicate().setSlice_range(
                                                                    new SliceRange().
                                                                    setStart(ByteBufferUtil.EMPTY_BYTE_BUFFER).
                                                                    setFinish(ByteBufferUtil.EMPTY_BYTE_BUFFER).
                                                                    setCount(100));
    ConfigHelper.setInputSlicePredicate(job.getConfiguration(), predicate);

    job.waitForCompletion(true);
    return 0;
 }
}

补偿误差包括:

1l5u6lss

1l5u6lss1#

因为你可能把这两行注解掉了:

//import org.apache.cassandra.hadoop.ColumnFamilyInputFormat;
//import org.apache.cassandra.hadoop.ConfigHelper;

相关问题