我是Spark的新手,我正在EMR集群上运行spark作业。完成这项工作大约需要一个小时。
在job中,有一段代码用于从S3阅读文件(文件大小为10MB)并将其内容写入dynamo db。
我不明白为什么要花这么长时间才能完成这项任务?我以为它会更快地完成。
下面是Spark Job的代码:
public class PopulateCovid19Citations {
private static final String APP_NAME = "PopulateCovid19Citations";
private static Logger LOGGER = LogManager.getLogger(PopulateCovid19Citations.class);
static Logger log = Logger.getLogger(PopulateCovid19Citations.class.getName());
public static void main(String[] args) throws Exception {
Logger.getLogger("org").setLevel(Level.ERROR);
// Building the Spark session
JavaSparkContext javaSparkContext = SparkConfiguration.buildSparkContext(APP_NAME);
SparkSession sparkSession = SparkSession.builder()
.sparkContext(javaSparkContext.sc()).getOrCreate();
String fileLocationTest = "s3a://mybucket/WHOCovid19CitationsDatabase.csv";
Dataset<Row> fullCitations =
sparkSession.read().format("com.databricks.spark.csv").option("inferSchema", "true")
.option("header", "true").load(fileLocationTest);
fullCitations.show(10);
// Selecting only the relevant columns for our exercise
Dataset<Row> citations = fullCitations.select(
col("Title"),
col("Authors"),
col("Abstract"),
col("Published Year"),
col("Journal"),
col("Study"),
col("Tags"));
citations.show(10);
// Removing citations with null title
Dataset<Row> filteredCitations = citations.filter(col("Title").isNotNull());
// Building a RDD composed of DynamoDB writable items that matches the Covid19Citation table
JavaPairRDD<Text, DynamoDBItemWritable> dynamoCitations = filteredCitations.javaRDD().mapToPair(citation -> {
Map<String, AttributeValue> attributes = new HashMap<>();
putStringAttribute(attributes, "citationCode", UUID.randomUUID().toString());
putStringAttribute(attributes, "title", citation.getAs("Title"));
putStringAttribute(attributes, "authors", citation.getAs("Authors"));
putStringAttribute(attributes, "abstract", citation.getAs("Abstract"));
putNumberAttribute(attributes, "publishedYear", citation.getAs("Published Year"));
putStringAttribute(attributes, "journal", citation.getAs("Journal"));
putStringAttribute(attributes, "study", citation.getAs("Study"));
putStringAttribute(attributes, "tags", citation.getAs("Tags"));
DynamoDBItemWritable dynamoDBItemWritable = new DynamoDBItemWritable();
dynamoDBItemWritable.setItem(attributes);
return new Tuple2<>(new Text(""), dynamoDBItemWritable);
});
// Writing data to the DynamoDB table
JobConf jobConf = SparkConfiguration.buildJobConf(javaSparkContext, Boolean.FALSE);
dynamoCitations.saveAsHadoopDataset(jobConf);
sparkSession.stop();
}
private static void putStringAttribute(Map<String, AttributeValue> attributes, String key, Object fieldValue) {
if (fieldValue != null) {
attributes.put(key, new AttributeValue(fieldValue.toString()));
}
}
private static void putNumberAttribute(Map<String, AttributeValue> attributes, String key, Object fieldValue) {
if (fieldValue != null) {
try {
Integer integerFieldValue = Integer.parseInt(fieldValue.toString());
AttributeValue attributeValue = new AttributeValue();
attributeValue.setN(integerFieldValue.toString());
attributes.put(key, attributeValue);
} catch (Exception e) {
LOGGER.info("cannot convert " + fieldValue + " to integer");
}
}
}
}
下面是构建Spark上下文的配置:
public static JavaSparkContext buildSparkContext(String application) throws ClassNotFoundException {
SparkConf conf = new SparkConf()
.setAppName(application)
.set("spark.executor.cores", "5")
.set("spark.executor.memory", "42G")
.set("spark.executor.memoryOverhead", "4G")
.set("spark.driver.memory", "42G")
.set("spark.driver.cores", "5")
.set("spark.executor.instances", "25")
.set("spark.default.parallelism", "250")
.set("spark.dynamicAllocation.enabled", "false")
.registerKryoClasses(new Class<?>[]{
Class.forName("org.apache.hadoop.io.Text"),
Class.forName("org.apache.hadoop.dynamodb.DynamoDBItemWritable")
});
return new JavaSparkContext(conf);
}
我正在使用带有m6a.16xlarge(64vcore,259GB内存)的EMR集群用于EC2示例类型。
有人能帮我一下吗?
先谢谢你了。
3条答案
按热度按时间h7appiyu1#
CSV模式推断是通过“读取整个文件并计算出每列的类型”来实现的。有一点取样,但实际上是一样的:1个或多个HTTP GET请求,在任何计算开始之前 * 流经整个文件 *
理想情况下,将CSV转换为具有良好定义模式的现代文件格式,例如Avro。如果您无法做到这一点,请在应用中显式声明csv文件的架构。
这可能不是您的问题的唯一原因,但它将是处理中的第一个瓶颈
jw5wzhpr2#
如果对相同的数据进行迭代运算,则应使用
persist()
方法,以避免重复计算。我建议在创建
fullCitations
数据集后至少将其持久化。有关持久性的更多信息,请参阅文档:https://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence
另外,我会检查Spark配置。您有一个具有64个vcore和259GB RAM的群集,但您声明了25个executors,每个executors具有5个核心和42GB RAM。我不确定每个执行器是否需要这么多资源,但是如果集群没有足够的资源来启动所有执行器,可能会影响性能。
6uxekuva3#
我终于找到了问题所在。在yarn log中,我看到这样的行:
结论是数据库是我的瓶颈。似乎我们超出了dynamodb表的吞吐量。为了解决这个问题,我需要更改dynamodb的预配配置。