使用map reduce在cassandra中执行批量加载

4nkexdtk  于 2021-06-04  发布在  Hadoop
关注(0)|答案(1)|浏览(418)

我和Cassandra合作的经验不多,如果我的方法不对,请原谅。
我正在尝试用map reduce在cassandra进行批量加载
基本上是字数计算的例子
参考文献:http://henning.kropponline.de/2012/11/15/using-cassandra-hadoopbulkoutputformat/
我已经把简单的hadoop wordcountMap器的例子,并稍微修改了驱动程序代码和减速机根据上述例子。
我也成功地生成了输出文件。现在我的疑问是如何执行Cassandra部分加载?我的方法有什么不同吗?
请给我建议。
这是驱动程序代码的一部分

Job job = new Job();
 job.setJobName(getClass().getName());
 job.setJarByClass(CassaWordCountJob.class);

 Configuration conf = job.getConfiguration();
 conf.set("cassandra.output.keyspace", "test");
 conf.set("cassandra.output.columnfamily", "words");
 conf.set("cassandra.output.partitioner.class", "org.apache.cassandra.dht.RandomPartitioner");
 conf.set("cassandra.output.thrift.port","9160");    // default
 conf.set("cassandra.output.thrift.address", "localhost");
 conf.set("mapreduce.output.bulkoutputformat.streamthrottlembits", "400");

 job.setMapperClass(CassaWordCountMapper.class);
 job.setMapOutputKeyClass(Text.class);
 job.setMapOutputValueClass(IntWritable.class);
 FileInputFormat.setInputPaths(job, new Path(args[0]));
 job.setReducerClass(CassaWordCountReducer.class);
 FileOutputFormat.setOutputPath(job, new Path("/home/user/Desktop/test/cassandra")); 
 MultipleOutputs.addNamedOutput(job, "reducer", BulkOutputFormat.class, ByteBuffer.class, List.class);
 return job.waitForCompletion(true) ? 0 : 1;

Map器与普通的wordcountMap器相同,它只是标记并发出word,1
reducer类的形式如下

public class CassaWordCountReducer extends 
        Reducer<Text, IntWritable, ByteBuffer, List<Mutation>> {

    @Override
    public void reduce(Text key, Iterable<IntWritable> values, Context context)
            throws IOException, InterruptedException {
        List<Mutation> columnsToAdd = new ArrayList<Mutation>();
        Integer wordCount = 0;
        for(IntWritable value : values) {
            wordCount += value.get();
        }
        Column countCol = new Column(ByteBuffer.wrap("count".getBytes()));
        countCol.setValue(ByteBuffer.wrap(wordCount.toString().getBytes()));
        countCol.setTimestamp(new Date().getTime());
        ColumnOrSuperColumn wordCosc = new ColumnOrSuperColumn();
        wordCosc.setColumn(countCol);
        Mutation countMut = new Mutation();
        countMut.column_or_supercolumn = wordCosc;
        columnsToAdd.add(countMut);
        context.write(ByteBuffer.wrap(key.toString().getBytes()), columnsToAdd);
    }
}
4dc9hkyq

4dc9hkyq1#

要批量装载到cassandra,我建议您从datastax看这篇文章。基本上你需要做两件事批量装载:
您的输出数据本机不适合cassandra,您需要将其转换为sstables。
一旦你有了sstables,你就需要能够将它们流到cassandra中。当然,您不只是想将每个sstable复制到每个节点,您只想将数据的相关部分复制到每个节点
在您使用 BulkOutputFormat ,当它使用 sstableloader 在幕后。我从来没用过 MultipleOutputs ,但它应该可以正常工作。
我认为你的错误在于你没有使用 MultipleOutputs 正确:你仍然在做一个 context.write ,当你真的应该写信给你的 MultipleOutputs 对象。你现在这样做,因为你是在给普通人写信 Context ,它将以默认的输出格式 TextOutputFormat 而不是你在书中定义的那个 MultipleOutputs . 有关如何使用 MultipleOutputs 在你的减速机里。
一旦写入到正确的输出格式 BulkOutputFormat 正如您所定义的,您的sstables应该从集群中的每个节点创建并流式传输到cassandra—您不需要任何额外的步骤,输出格式将为您处理它。
另外,我建议看这篇文章,他们也解释了如何使用 BulkOutputFormat ,但他们用的是 ConfigHelper 您可能需要查看它,以便更轻松地配置cassandra端点。

相关问题