从mapreduce作业向配置单元添加分区

jhkqcmku  于 2021-06-04  发布在  Hadoop
关注(0)|答案(3)|浏览(381)

我是新的Hive和mapreduce,将非常感谢您的回答,并提供一个正确的方法。
我已经定义了一个外部表 logs 在配置单元中,在日期和源服务器上分区,在hdfs上有一个外部位置 /data/logs/ . 我有一个mapreduce作业,它获取这些日志文件并将它们拆分,然后存储在上面提到的文件夹下。喜欢

"/data/logs/dt=2012-10-01/server01/"
"/data/logs/dt=2012-10-01/server02/"
...
...

在mapreduce作业中,我想向配置单元中的表日志添加分区。我知道这两种方法
alter table命令—alter table命令太多
添加动态分区
对于方法二,我只看到 INSERT OVERWRITE 这不是我的选择。有没有办法在作业结束后将这些新分区添加到表中?

50few1ms

50few1ms1#

要在map/reduce作业中实现这一点,我建议使用apachehcatalog,这是一个在hadoop下标记的新项目。
hcatalog实际上是hdfs之上的一个抽象层,因此您可以用标准化的方式编写输出,无论是从hive、pig还是m/r。在这里,您可以直接使用输出格式从map/reduce作业中加载hive中的数据 HCatOutputFormat . 下面是一个来自官方网站的例子。
当前用于为(a=1,b=1)写出特定分区的代码示例如下所示:

Map<String, String> partitionValues = new HashMap<String, String>();
partitionValues.put("a", "1");
partitionValues.put("b", "1");
HCatTableInfo info = HCatTableInfo.getOutputTableInfo(dbName, tblName, partitionValues);
HCatOutputFormat.setOutput(job, info);

要写入多个分区,必须使用上述每个分区启动单独的作业。
您还可以在hcatalog中使用动态分区,在这种情况下,您可以在同一个作业中加载任意数量的分区!
我建议您在上面提供的网站上进一步阅读hcatalog,如果需要的话,可以提供更多的细节。

rdrgkggo

rdrgkggo2#

下面是使用hcatalog在一个作业中使用动态分区写入多个表的代码,该代码已在hadoop 2.5.0、hive 0.13.1上进行了测试:

// ... Job setup, InputFormatClass, etc ...
String dbName = null;
String[] tables = {"table0", "table1"};

job.setOutputFormatClass(MultiOutputFormat.class);
MultiOutputFormat.JobConfigurer configurer = MultiOutputFormat.createConfigurer(job);

List<String> partitions = new ArrayList<String>();
partitions.add(0, "partition0");
partitions.add(1, "partition1");

HCatFieldSchema partition0 = new HCatFieldSchema("partition0", TypeInfoFactory.stringTypeInfo, null);
HCatFieldSchema partition1 = new HCatFieldSchema("partition1", TypeInfoFactory.stringTypeInfo, null);

for (String table : tables) {
    configurer.addOutputFormat(table, HCatOutputFormat.class, BytesWritable.class, CatRecord.class);

    OutputJobInfo outputJobInfo = OutputJobInfo.create(dbName, table, null);
    outputJobInfo.setDynamicPartitioningKeys(partitions);

    HCatOutputFormat.setOutput(
        configurer.getJob(table), outputJobInfo
    );

    HCatSchema schema = HCatOutputFormat.getTableSchema(configurer.getJob(table).getConfiguration());
    schema.append(partition0);
    schema.append(partition1);

    HCatOutputFormat.setSchema(
        configurer.getJob(table),
        schema
    );
}
configurer.configure();

return job.waitForCompletion(true) ? 0 : 1;

Map器:

public static class MyMapper extends Mapper<LongWritable, Text, BytesWritable, HCatRecord> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        HCatRecord record = new DefaultHCatRecord(3); // Including partitions
        record.set(0, value.toString());

        // partitions must be set after non-partition fields
        record.set(1, "0"); // partition0=0
        record.set(2, "1"); // partition1=1

        MultiOutputFormat.write("table0", null, record, context);
        MultiOutputFormat.write("table1", null, record, context);
    }
}
wf82jlnq

wf82jlnq3#

事实上,事情比这复杂一点,这是不幸的,因为它是没有正式来源(截至目前),并需要几天的挫折才能弄清楚。
我发现,我需要执行以下操作才能让hcatalog mapreduce作业能够写入动态分区:
在我工作的记录编写阶段(通常是reducer),我必须手动将动态分区(hcatfieldschema)添加到hcatschema对象中。
问题是hcatoutputformat.gettableschema(config)实际上并不返回分区字段。它们需要手动添加

HCatFieldSchema hfs1 = new HCatFieldSchema("date", Type.STRING, null);
HCatFieldSchema hfs2 = new HCatFieldSchema("some_partition", Type.STRING, null);
schema.append(hfs1);
schema.append(hfs2);

相关问题