使用Apache Spark在Cassandra中批量插入挂起,并且从Web Ser触发时上下文未关闭

fnatzsnv  于 2022-11-05  发布在  Cassandra
关注(0)|答案(1)|浏览(95)

我是一个新的Spark。我试图插入csv文件到cassandra表使用Spark-cassandra连接器如下:文件在Hdfs中,我获取所有文件的路径,对于每个路径,我调用一个方法,该方法将csv数据转换为相应的cassandra数据类型,并创建一个预处理语句,将数据绑定到预处理语句,然后将其添加到一个批处理中。最后,当批处理达到1000时,我执行该批处理。关键点1.我使用的是Apache Cassandra 2.1.8和Spark 1.5 2.我使用Spark Context 3读取了Csv文件。我正在使用com.datastax.spark.connector.cql. Cassandra连接器创建与Cassandra的会话。
我有9个文件,每个文件的数据都在cassandra的一个表中。每件事都很好,所有的插入都按预期发生,当我在spark提交上提交jar时,工作就完成了。
我面临的问题是当通过Web服务调用同一个Jar时(Web服务调用脚本来调用Jar),其中一个文件的数据没有被插入,Spark上下文也没有停止,因为作业一直在运行。
当我插入4个文件或5个文件时,即使通过Web服务,一切都工作正常。但它挂在一起,我在其中一个表中少了10条记录,上下文没有停止。
它的奇怪,因为当我提交的jar上的Spark提交直接一切工作正常,并通过网络服务,我面临着这个问题,其奇怪的bcz甚至网络服务提交的工作,以相同的Spark提交。
这是我的代码

package com.pz.loadtocassandra;

 import java.io.File;
import java.io.IOException;
 import java.math.BigDecimal;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedHashMap;
 import java.util.List;
import java.util.Map;
import java.util.logging.ConsoleHandler;
import java.util.logging.FileHandler;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;

import com.datastax.driver.core.BatchStatement;
import com.datastax.driver.core.BoundStatement;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.exceptions.InvalidTypeException;
import com.datastax.spark.connector.cql.CassandraConnector;
import com.datastax.spark.connector.japi.CassandraRow;
import com.pz.shared.UnicodeBOMInputStream;
import com.pz.shared.fileformat.Header;
import com.pz.shared.mr.fileformat.MRFileFormats.CSVInputFormat;
import com.pz.shared.mr.fileformat.MRFileFormats.TextArrayWritable;

 public class LoadToCassandra {

public static final String STUDYID = "STUDYID";
public static final String PROJECTNAME = "PROJECTNAME";
public static final String FILEID = "FILEID";
public static int count = 0;
public static final String FILE_SERPERATOR = "/";
public static Logger log = Logger.getLogger(LoadToCassandra.class.getName());
public static void main(String[] args) {
        String propFileLoc = args[0];
        String hdfsHome = args[1];
        String hdfs_DtdXmlPath = args[2];
        String hdfs_NormalizedDataPath = args[3];

        run(propFileLoc, hdfsHome,     hdfs_DtdXmlPath,hdfs_NormalizedDataPath);
    } catch (IOException exception) {
        log.log(Level.SEVERE, "Error occur in FileHandler.", exception);
    }
}

public static void run(String propFileLoc, String hdfsHome,
        String hdfs_DtdXmlPath, String hdfs_NormalizedDataPath) {
    JavaSparkContext ctx = null;
    FileSystem hadoopFs = null;
    try {

        PropInitialize.initailizeConfig(propFileLoc);
        //setting spark context
        ctx = setSparkContext(propFileLoc);
        ParseDtdXml.parseDTDXML(hdfsHome, hdfs_DtdXmlPath);
        Configuration configuration = setHadoopConf();
        hadoopFs = getHadoopFs(hdfsHome, configuration);
        FileStatus[] fstat = hadoopFs.listStatus(new Path(hdfs_NormalizedDataPath));
        //Getting the csv paths
        Path[] paths = FileUtil.stat2Paths(fstat);
        log.info("PATH.size - " + paths.length);
        for (Path path : paths) {
            log.info("path is : "+path.toString());
            loadToCassandra(propFileLoc, path, configuration,hdfsHome, ctx);
        }

    } catch (IOException | URISyntaxException e) {
        log.log(Level.SEVERE, "run method", e);
        e.printStackTrace();
    } finally {
        log.info("finally ");
        if (ctx!= null) {
            ctx.stop();
            System.out.println("SC Stopped");
        }
        if (hadoopFs != null) {
            try {
                hadoopFs.close();
            } catch (IOException e) {
                log.log(Level.SEVERE, "run method", e);
            }
        }
    }
}

// input : 1. String hdfs home ,
// 2. Configuration hadoop conf object
// returns : hadoop File System object
private static FileSystem getHadoopFs(String hdfsHome,
        Configuration configuration) throws IOException, URISyntaxException {
    return FileSystem.get(new URI(hdfsHome), configuration);

}

// input : no inputs
// process : sets hadoop config parameters
// output : retuns hadoop conf object
private static Configuration setHadoopConf() throws IOException,
        URISyntaxException {
    Configuration configuration = new Configuration();
    configuration.setBoolean("csvFileFormat.encoded.flag", true);
    configuration.set("csvinputformat.token.delimiter", ",");
    return configuration;

}

// input : string Properties File Location
// process : creates and sets the configurations of spark context
// retuns : JavaSparkContext object with configurations set to it.
private static JavaSparkContext setSparkContext(String propFileLoc) {
    PropInitialize.initailizeConfig(propFileLoc);
    SparkConf conf = new SparkConf();
    conf.set("spark.serializer",
            "org.apache.spark.serializer.KryoSerializer");
    conf.setAppName("Loading Data");
    conf.setMaster(PropInitialize.spark_master);
    conf.set("spark.cassandra.connection.host",
            PropInitialize.cassandra_hostname);
    conf.setJars(PropInitialize.external_jars);
    return new JavaSparkContext(conf);

}

private static void loadToCassandra(String propFileLoc, Path sourceFileHdfsPath,
        Configuration hadoopConf, String hdfsHome,JavaSparkContext ctx) {
    System.out.println("File :: " + sourceFileHdfsPath.toString());
    FileSystem hadoopFs = null;
    PropInitialize.initailizeConfig(propFileLoc);
    String cassKeyspaceName = PropInitialize.cass_keyspace_name;
    log.info("entered here for file "+sourceFileHdfsPath.toString());

    final String strInputFileName = StringUtils.split(
            sourceFileHdfsPath.getName(), "#")[0].toLowerCase();
    final String strTableNameInCass = StringUtils.split(
            sourceFileHdfsPath.getName(), "-")[0].split("#")[1]
            .toLowerCase();

    final String strSourceFilePath = sourceFileHdfsPath.toString();

    try {
        hadoopFs = getHadoopFs(hdfsHome, hadoopConf);

        //getting the cassandra connection using spark conf
        final CassandraConnector connector = getCassandraConnection(ctx);

         final JavaRDD<CassandraRow> cassTableObj=getCassTableObj(ctx,cassKeyspaceName,strTableNameInCass);

        final Map<String, List<String>> tabColMapWithColTypes1 = ParseDtdXml.tabColMapWithColTypes;

        final String headersUpdated;
        final String headers;

        UnicodeBOMInputStream ubis = new UnicodeBOMInputStream(
                hadoopFs.open(sourceFileHdfsPath));
        Header CsvHeader = Header.getCSVHeader(ubis, ",");
        if (!strTableNameInCass.equalsIgnoreCase("PCMASTER")) {

            String fString = "";
            for (int i = 0; i < CsvHeader.size() - 1; i++) {
                fString = fString + CsvHeader.get(i).ColumnName + ",";
            }
            fString = fString
                    + CsvHeader.get(CsvHeader.size() - 1).ColumnName;

            headers = fString; // StringUtils.join(stringArr.toString(),",");

            headersUpdated = strTableNameInCass.toUpperCase() + "ID,"
                    + headers;

        } else {

            String[] stringArr = new String[CsvHeader.size()];
            String fString = "";
            for (int i = 0; i < CsvHeader.size() - 1; i++) {
                // stringArr[i] = CsvHeader.get(i).ColumnName;
                fString = fString + CsvHeader.get(i).ColumnName + ",";
            }
            fString = fString
                    + CsvHeader.get(CsvHeader.size() - 1).ColumnName;
            headers = StringUtils.join(stringArr.toString(), ",");
            headersUpdated = fString;

        }

        ubis.close();

        //Reading the file using spark context
        JavaPairRDD<LongWritable, TextArrayWritable> fileRdd = ctx
                .newAPIHadoopFile(strSourceFilePath, CSVInputFormat.class,
                        LongWritable.class, TextArrayWritable.class,
                        hadoopConf);

        final long recCount = fileRdd.count();

        final String[] actCols = headersUpdated.split(",");

        final LinkedHashMap<Object, String> mapOfColNameAndType = new LinkedHashMap<Object, String>();
        final List<String> colNameAndType = tabColMapWithColTypes1
                .get(strTableNameInCass.toUpperCase());

        for (int i = 0; i < actCols.length; i++) {

            if (colNameAndType.contains(actCols[i] + " " + "text")) {
                int indexOfColName = colNameAndType.indexOf(actCols[i]
                        + " " + "text");

                mapOfColNameAndType.put(i,
                        colNameAndType.get(indexOfColName).split(" ")[1]);

            } else if (colNameAndType
                    .contains(actCols[i] + " " + "decimal")) {
                int indexOfColName = colNameAndType.indexOf(actCols[i]
                        + " " + "decimal");
                mapOfColNameAndType.put(i,
                        colNameAndType.get(indexOfColName).split(" ")[1]);
            } else {
                continue;
            }

        }

        //creates the query for prepared statement
        final String makeStatement = makeSt(cassKeyspaceName,
                strTableNameInCass, actCols);
        final long seqId1 = cassTableObj.count();

        //calling map on the fileRdd 
        JavaRDD<String> data = fileRdd.values().map(
                new Function<TextArrayWritable, String>() {
                    /**
                     * 
                     */
                    private static final long serialVersionUID = 1L;
                    Session session;
                    boolean isssession = false;
                    PreparedStatement statement;
                    BatchStatement batch;
                    int lineCount = 0;

                    long seqId = seqId1;

                    /*for each line returned as an TextArrayWritable convert each cell the corresponding
                     * bind the data to prepared statement
                     * add it to batch
                     */
                    @Override
                    public String call(TextArrayWritable tup)
                            throws Exception {
                        seqId++;
                        lineCount++;

                        log.info("entered here 3 for file "+strSourceFilePath.toString());
                        String[] part = tup.toStrings();

                        Object[] parts = getDataWithUniqueId(
                                strTableNameInCass, part);

                        //For each file
                        //Creates the session
                        //creates the PreparedStatement
                        if (!isssession) {
                            session = connector.openSession();
                            statement = session.prepare(makeStatement);
                            log.info("entered here 4 for file "+strSourceFilePath.toString());
                            // System.out.println("statement :" +
                            // statement);
                            isssession = true;
                            batch = new BatchStatement();
                        }

                        List<Object> typeConvData = new ArrayList<Object>();

                        for (int i = 0; i < parts.length; i++) {
                            String type = mapOfColNameAndType.get(i);
                            try {
                                if (type.equalsIgnoreCase("text")) {

                                    typeConvData.add(parts[i]);
                                } else {

                                    // parts[i] =
                                    // parts[i].toString().replace("\"",
                                    // "");
                                    // check if the String the has to
                                    // converted to a BigDecimal is any
                                    // positive or negetive integer or not.
                                    // if its not a positive integer or
                                    // negative forcefully convert it to
                                    // zero (avoiding NumberFormatException)
                                    if (!((String) parts[i])
                                            .matches("-?\\d+")) {
                                        parts[i] = "0";
                                    }
                                    long s = Long
                                            .valueOf((String) parts[i]);
                                    typeConvData.add(BigDecimal.valueOf(s));

                                }
                            } catch (NullPointerException e) {
                                log.log(Level.SEVERE, "loadToCass method", e);

                            } catch (NumberFormatException e) {
                                log.log(Level.SEVERE, "loadToCass method", e);
                            } catch (InvalidTypeException e) {
                                log.log(Level.SEVERE, "loadToCass method", e);
                            }
                        }

                                                    List<Object> data = typeConvData;

                        //bind data to query
                        final BoundStatement query = statement.bind(data
                                .toArray(new Object[data.size()]));

                        //add query to batch
                        batch.add(query);
                        int count = LoadToCassandra.count;

                        //when count is 1k execute batch
                        if (count == 1000) {

                            log.info("entered here 5 for file "+strSourceFilePath.toString());
                            log.info("batch done");
                            session.execute(batch);
                            LoadToCassandra.count = 0;
                            batch = new BatchStatement();
                            return StringUtils.join(tup.toStrings());
                        }

                        //if its the last batch and its not of size 1k
                        if (lineCount == (recCount))
                            {
                            log.info("Last Batch");
                            session.executeAsync(batch);
                            log.info("entered here 6 for file "+strSourceFilePath.toString());
                            //session.execute(batch);
                            session.close();
                            log.info("Session closed");
                        }

                        LoadToCassandra.count++;
                        return StringUtils.join(tup.toStrings());
                    }

                    private Object[] getDataWithUniqueId(
                            String strTableNameInCass, String[] part) {
                        Object[] parts = null;
                        ArrayList<String> tempArraylist = new ArrayList<String>();
                        if (!strTableNameInCass
                                .equalsIgnoreCase("PCMASTER")) {
                            for (int i = 0; i < part.length; i++) {
                                if (i == 0) {
                                    tempArraylist.add(0,
                                            String.valueOf(seqId));
                                }
                                tempArraylist.add(part[i]);
                            }
                            parts = tempArraylist.toArray();
                        } else {
                            parts = part;
                        }

                        return parts;
                    }

                });

        data.count();
        hadoopFs.close();

    } catch (Exception e) {
        e.printStackTrace();
    }
}

private static JavaRDD<CassandraRow> getCassTableObj(
        JavaSparkContext ctx, String cassKeyspaceName,
        String strTableNameInCass) {
    return javaFunctions(ctx)
            .cassandraTable(cassKeyspaceName,
                    strTableNameInCass.toLowerCase());
}

private static CassandraConnector getCassandraConnection(
        JavaSparkContext ctx) {
    return CassandraConnector.apply(ctx.getConf());

}

private static String makeSt(String keyspace, String tabName,
        String[] colNames) {
    StringBuilder sb = new StringBuilder();
    sb.append("INSERT INTO " + keyspace + "." + tabName + " ( ");
    List<String> vars = new ArrayList<>();
    for (int i = 0; i < (colNames.length - 1); i++) {
        sb.append(colNames[i] + ",");
        vars.add("?");
    }
    vars.add("?");
    sb.append(colNames[colNames.length - 1] + " ) values ( "
            + StringUtils.join(vars, ",") + " ) ");

    return sb.toString();
   }}

有谁能告诉我是什么原因导致了这个问题,以及如何解决它。谢谢

pinkon5k

pinkon5k1#

一旦你把你的数据插入cassandra,调用ctx.stop()方法,它就会停止spark context。

相关问题