我是一个新的Spark。我试图插入csv文件到cassandra表使用Spark-cassandra连接器如下:文件在Hdfs中,我获取所有文件的路径,对于每个路径,我调用一个方法,该方法将csv数据转换为相应的cassandra数据类型,并创建一个预处理语句,将数据绑定到预处理语句,然后将其添加到一个批处理中。最后,当批处理达到1000时,我执行该批处理。关键点1.我使用的是Apache Cassandra 2.1.8和Spark 1.5 2.我使用Spark Context 3读取了Csv文件。我正在使用com.datastax.spark.connector.cql. Cassandra连接器创建与Cassandra的会话。
我有9个文件,每个文件的数据都在cassandra的一个表中。每件事都很好,所有的插入都按预期发生,当我在spark提交上提交jar时,工作就完成了。
我面临的问题是当通过Web服务调用同一个Jar时(Web服务调用脚本来调用Jar),其中一个文件的数据没有被插入,Spark上下文也没有停止,因为作业一直在运行。
当我插入4个文件或5个文件时,即使通过Web服务,一切都工作正常。但它挂在一起,我在其中一个表中少了10条记录,上下文没有停止。
它的奇怪,因为当我提交的jar上的Spark提交直接一切工作正常,并通过网络服务,我面临着这个问题,其奇怪的bcz甚至网络服务提交的工作,以相同的Spark提交。
这是我的代码
package com.pz.loadtocassandra;
import java.io.File;
import java.io.IOException;
import java.math.BigDecimal;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.logging.ConsoleHandler;
import java.util.logging.FileHandler;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import com.datastax.driver.core.BatchStatement;
import com.datastax.driver.core.BoundStatement;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.exceptions.InvalidTypeException;
import com.datastax.spark.connector.cql.CassandraConnector;
import com.datastax.spark.connector.japi.CassandraRow;
import com.pz.shared.UnicodeBOMInputStream;
import com.pz.shared.fileformat.Header;
import com.pz.shared.mr.fileformat.MRFileFormats.CSVInputFormat;
import com.pz.shared.mr.fileformat.MRFileFormats.TextArrayWritable;
public class LoadToCassandra {
public static final String STUDYID = "STUDYID";
public static final String PROJECTNAME = "PROJECTNAME";
public static final String FILEID = "FILEID";
public static int count = 0;
public static final String FILE_SERPERATOR = "/";
public static Logger log = Logger.getLogger(LoadToCassandra.class.getName());
public static void main(String[] args) {
String propFileLoc = args[0];
String hdfsHome = args[1];
String hdfs_DtdXmlPath = args[2];
String hdfs_NormalizedDataPath = args[3];
run(propFileLoc, hdfsHome, hdfs_DtdXmlPath,hdfs_NormalizedDataPath);
} catch (IOException exception) {
log.log(Level.SEVERE, "Error occur in FileHandler.", exception);
}
}
public static void run(String propFileLoc, String hdfsHome,
String hdfs_DtdXmlPath, String hdfs_NormalizedDataPath) {
JavaSparkContext ctx = null;
FileSystem hadoopFs = null;
try {
PropInitialize.initailizeConfig(propFileLoc);
//setting spark context
ctx = setSparkContext(propFileLoc);
ParseDtdXml.parseDTDXML(hdfsHome, hdfs_DtdXmlPath);
Configuration configuration = setHadoopConf();
hadoopFs = getHadoopFs(hdfsHome, configuration);
FileStatus[] fstat = hadoopFs.listStatus(new Path(hdfs_NormalizedDataPath));
//Getting the csv paths
Path[] paths = FileUtil.stat2Paths(fstat);
log.info("PATH.size - " + paths.length);
for (Path path : paths) {
log.info("path is : "+path.toString());
loadToCassandra(propFileLoc, path, configuration,hdfsHome, ctx);
}
} catch (IOException | URISyntaxException e) {
log.log(Level.SEVERE, "run method", e);
e.printStackTrace();
} finally {
log.info("finally ");
if (ctx!= null) {
ctx.stop();
System.out.println("SC Stopped");
}
if (hadoopFs != null) {
try {
hadoopFs.close();
} catch (IOException e) {
log.log(Level.SEVERE, "run method", e);
}
}
}
}
// input : 1. String hdfs home ,
// 2. Configuration hadoop conf object
// returns : hadoop File System object
private static FileSystem getHadoopFs(String hdfsHome,
Configuration configuration) throws IOException, URISyntaxException {
return FileSystem.get(new URI(hdfsHome), configuration);
}
// input : no inputs
// process : sets hadoop config parameters
// output : retuns hadoop conf object
private static Configuration setHadoopConf() throws IOException,
URISyntaxException {
Configuration configuration = new Configuration();
configuration.setBoolean("csvFileFormat.encoded.flag", true);
configuration.set("csvinputformat.token.delimiter", ",");
return configuration;
}
// input : string Properties File Location
// process : creates and sets the configurations of spark context
// retuns : JavaSparkContext object with configurations set to it.
private static JavaSparkContext setSparkContext(String propFileLoc) {
PropInitialize.initailizeConfig(propFileLoc);
SparkConf conf = new SparkConf();
conf.set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer");
conf.setAppName("Loading Data");
conf.setMaster(PropInitialize.spark_master);
conf.set("spark.cassandra.connection.host",
PropInitialize.cassandra_hostname);
conf.setJars(PropInitialize.external_jars);
return new JavaSparkContext(conf);
}
private static void loadToCassandra(String propFileLoc, Path sourceFileHdfsPath,
Configuration hadoopConf, String hdfsHome,JavaSparkContext ctx) {
System.out.println("File :: " + sourceFileHdfsPath.toString());
FileSystem hadoopFs = null;
PropInitialize.initailizeConfig(propFileLoc);
String cassKeyspaceName = PropInitialize.cass_keyspace_name;
log.info("entered here for file "+sourceFileHdfsPath.toString());
final String strInputFileName = StringUtils.split(
sourceFileHdfsPath.getName(), "#")[0].toLowerCase();
final String strTableNameInCass = StringUtils.split(
sourceFileHdfsPath.getName(), "-")[0].split("#")[1]
.toLowerCase();
final String strSourceFilePath = sourceFileHdfsPath.toString();
try {
hadoopFs = getHadoopFs(hdfsHome, hadoopConf);
//getting the cassandra connection using spark conf
final CassandraConnector connector = getCassandraConnection(ctx);
final JavaRDD<CassandraRow> cassTableObj=getCassTableObj(ctx,cassKeyspaceName,strTableNameInCass);
final Map<String, List<String>> tabColMapWithColTypes1 = ParseDtdXml.tabColMapWithColTypes;
final String headersUpdated;
final String headers;
UnicodeBOMInputStream ubis = new UnicodeBOMInputStream(
hadoopFs.open(sourceFileHdfsPath));
Header CsvHeader = Header.getCSVHeader(ubis, ",");
if (!strTableNameInCass.equalsIgnoreCase("PCMASTER")) {
String fString = "";
for (int i = 0; i < CsvHeader.size() - 1; i++) {
fString = fString + CsvHeader.get(i).ColumnName + ",";
}
fString = fString
+ CsvHeader.get(CsvHeader.size() - 1).ColumnName;
headers = fString; // StringUtils.join(stringArr.toString(),",");
headersUpdated = strTableNameInCass.toUpperCase() + "ID,"
+ headers;
} else {
String[] stringArr = new String[CsvHeader.size()];
String fString = "";
for (int i = 0; i < CsvHeader.size() - 1; i++) {
// stringArr[i] = CsvHeader.get(i).ColumnName;
fString = fString + CsvHeader.get(i).ColumnName + ",";
}
fString = fString
+ CsvHeader.get(CsvHeader.size() - 1).ColumnName;
headers = StringUtils.join(stringArr.toString(), ",");
headersUpdated = fString;
}
ubis.close();
//Reading the file using spark context
JavaPairRDD<LongWritable, TextArrayWritable> fileRdd = ctx
.newAPIHadoopFile(strSourceFilePath, CSVInputFormat.class,
LongWritable.class, TextArrayWritable.class,
hadoopConf);
final long recCount = fileRdd.count();
final String[] actCols = headersUpdated.split(",");
final LinkedHashMap<Object, String> mapOfColNameAndType = new LinkedHashMap<Object, String>();
final List<String> colNameAndType = tabColMapWithColTypes1
.get(strTableNameInCass.toUpperCase());
for (int i = 0; i < actCols.length; i++) {
if (colNameAndType.contains(actCols[i] + " " + "text")) {
int indexOfColName = colNameAndType.indexOf(actCols[i]
+ " " + "text");
mapOfColNameAndType.put(i,
colNameAndType.get(indexOfColName).split(" ")[1]);
} else if (colNameAndType
.contains(actCols[i] + " " + "decimal")) {
int indexOfColName = colNameAndType.indexOf(actCols[i]
+ " " + "decimal");
mapOfColNameAndType.put(i,
colNameAndType.get(indexOfColName).split(" ")[1]);
} else {
continue;
}
}
//creates the query for prepared statement
final String makeStatement = makeSt(cassKeyspaceName,
strTableNameInCass, actCols);
final long seqId1 = cassTableObj.count();
//calling map on the fileRdd
JavaRDD<String> data = fileRdd.values().map(
new Function<TextArrayWritable, String>() {
/**
*
*/
private static final long serialVersionUID = 1L;
Session session;
boolean isssession = false;
PreparedStatement statement;
BatchStatement batch;
int lineCount = 0;
long seqId = seqId1;
/*for each line returned as an TextArrayWritable convert each cell the corresponding
* bind the data to prepared statement
* add it to batch
*/
@Override
public String call(TextArrayWritable tup)
throws Exception {
seqId++;
lineCount++;
log.info("entered here 3 for file "+strSourceFilePath.toString());
String[] part = tup.toStrings();
Object[] parts = getDataWithUniqueId(
strTableNameInCass, part);
//For each file
//Creates the session
//creates the PreparedStatement
if (!isssession) {
session = connector.openSession();
statement = session.prepare(makeStatement);
log.info("entered here 4 for file "+strSourceFilePath.toString());
// System.out.println("statement :" +
// statement);
isssession = true;
batch = new BatchStatement();
}
List<Object> typeConvData = new ArrayList<Object>();
for (int i = 0; i < parts.length; i++) {
String type = mapOfColNameAndType.get(i);
try {
if (type.equalsIgnoreCase("text")) {
typeConvData.add(parts[i]);
} else {
// parts[i] =
// parts[i].toString().replace("\"",
// "");
// check if the String the has to
// converted to a BigDecimal is any
// positive or negetive integer or not.
// if its not a positive integer or
// negative forcefully convert it to
// zero (avoiding NumberFormatException)
if (!((String) parts[i])
.matches("-?\\d+")) {
parts[i] = "0";
}
long s = Long
.valueOf((String) parts[i]);
typeConvData.add(BigDecimal.valueOf(s));
}
} catch (NullPointerException e) {
log.log(Level.SEVERE, "loadToCass method", e);
} catch (NumberFormatException e) {
log.log(Level.SEVERE, "loadToCass method", e);
} catch (InvalidTypeException e) {
log.log(Level.SEVERE, "loadToCass method", e);
}
}
List<Object> data = typeConvData;
//bind data to query
final BoundStatement query = statement.bind(data
.toArray(new Object[data.size()]));
//add query to batch
batch.add(query);
int count = LoadToCassandra.count;
//when count is 1k execute batch
if (count == 1000) {
log.info("entered here 5 for file "+strSourceFilePath.toString());
log.info("batch done");
session.execute(batch);
LoadToCassandra.count = 0;
batch = new BatchStatement();
return StringUtils.join(tup.toStrings());
}
//if its the last batch and its not of size 1k
if (lineCount == (recCount))
{
log.info("Last Batch");
session.executeAsync(batch);
log.info("entered here 6 for file "+strSourceFilePath.toString());
//session.execute(batch);
session.close();
log.info("Session closed");
}
LoadToCassandra.count++;
return StringUtils.join(tup.toStrings());
}
private Object[] getDataWithUniqueId(
String strTableNameInCass, String[] part) {
Object[] parts = null;
ArrayList<String> tempArraylist = new ArrayList<String>();
if (!strTableNameInCass
.equalsIgnoreCase("PCMASTER")) {
for (int i = 0; i < part.length; i++) {
if (i == 0) {
tempArraylist.add(0,
String.valueOf(seqId));
}
tempArraylist.add(part[i]);
}
parts = tempArraylist.toArray();
} else {
parts = part;
}
return parts;
}
});
data.count();
hadoopFs.close();
} catch (Exception e) {
e.printStackTrace();
}
}
private static JavaRDD<CassandraRow> getCassTableObj(
JavaSparkContext ctx, String cassKeyspaceName,
String strTableNameInCass) {
return javaFunctions(ctx)
.cassandraTable(cassKeyspaceName,
strTableNameInCass.toLowerCase());
}
private static CassandraConnector getCassandraConnection(
JavaSparkContext ctx) {
return CassandraConnector.apply(ctx.getConf());
}
private static String makeSt(String keyspace, String tabName,
String[] colNames) {
StringBuilder sb = new StringBuilder();
sb.append("INSERT INTO " + keyspace + "." + tabName + " ( ");
List<String> vars = new ArrayList<>();
for (int i = 0; i < (colNames.length - 1); i++) {
sb.append(colNames[i] + ",");
vars.add("?");
}
vars.add("?");
sb.append(colNames[colNames.length - 1] + " ) values ( "
+ StringUtils.join(vars, ",") + " ) ");
return sb.toString();
}}
有谁能告诉我是什么原因导致了这个问题,以及如何解决它。谢谢
1条答案
按热度按时间pinkon5k1#
一旦你把你的数据插入cassandra,调用
ctx.stop()
方法,它就会停止spark context。