将csv文件转换为hdfs上的另一种csv格式

nue99wik  于 2021-06-04  发布在  Hadoop
关注(0)|答案(2)|浏览(448)

我必须实现一个csv文件转换器才能在hadoop集群上运行。主要线路有:
我在hdfs上有一堆csv文件,包含任意内容。
我知道如何使用java代码将它们转换成“标准”的(即具有指定的行)。
转换需要一些参数(大约10或15),每个文件的参数不同。
我不介意对输出文件进行分段。
但我希望他们有一个 input-filename[##].csv 命名以区分它们,以便以后进行处理/可视化。
我的问题是:最好的办法是什么?
作为hadoop的新手,我正在考虑使用map reduce来实现这一点,但是我对输出格式有一些问题。另一方面,我可以使用spark(在scala中使用java代码)。似乎很容易编码,但我不知道怎么做。
对要执行的主要任务的指针意见,来自(更多)有经验的用户将不胜感激。

zz2j4svz

zz2j4svz1#

Spark是个不错的选择。它为您提供了更大的灵活性和快速处理。

g52tjvyc

g52tjvyc2#

使用spark确实很简单:

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

import org.apache.hadoop.fs.FileUtil;

import java.io.File;

public class Converter {
    static String appName = "CSV-Conversion";  // spark app name
    static String master = "local";            // spark master 

    JavaSparkContext sc;

    /**
     * Init spark context
     */
    public Converter(){
        SparkConf conf = new SparkConf().setAppName(appName).setMaster(master);
        sc = new JavaSparkContext(conf);
    }

    /**
     * The conversion using spark
     */
    public void convertFile(String inputFile, String outputDir){
        JavaRDD<String> inputRdd = sc.textFile(inputFile);
        JavaRDD<String> outputRdd = inputRdd.map(Converter::convertLine);
        outputRdd.saveAsTextFile(outputDir);
    }

    /**
     * The function that convert each file line.
     *
     * It is static (i.e. does not requires 'this') and does not use other object.
     * If external objects (or not static method) are required, they must be
     * serializable so that a copy can be send to each worker node.
     * It is however better to avoid or at least minimize such data transfer.
     */
    public static String convertLine(String line){
        return line.toUpperCase();
    }

    /**
     * As a stand-alone app
     */
    public static void main(String[] args){
        if(args.length!=2) {
            System.out.println("Invalid number of arguments. Usage: Converter inputFile outputDir");
            System.exit(1);
        }

        String inputFile = args[0];
        String outputDir = args[1];

        FileUtil.fullyDelete(new File(outputDir));

        Converter c = new Converter();
        c.convertFile(inputFile,outputDir);
    }
}

我在github中为它做了一个简单的maven项目

相关问题