hadoop Spark Job处理10MB文件需要一个小时

dy1byipe  于 2023-06-21  发布在  Hadoop
关注(0)|答案(3)|浏览(220)

我是Spark的新手,我正在EMR集群上运行spark作业。完成这项工作大约需要一个小时。
在job中,有一段代码用于从S3阅读文件(文件大小为10MB)并将其内容写入dynamo db。
我不明白为什么要花这么长时间才能完成这项任务?我以为它会更快地完成。
下面是Spark Job的代码:

public class PopulateCovid19Citations {

    private static final String APP_NAME = "PopulateCovid19Citations";
    private static Logger LOGGER = LogManager.getLogger(PopulateCovid19Citations.class);
    static Logger log = Logger.getLogger(PopulateCovid19Citations.class.getName());

    public static void main(String[] args) throws Exception {

        Logger.getLogger("org").setLevel(Level.ERROR);

        // Building the Spark session
        JavaSparkContext javaSparkContext = SparkConfiguration.buildSparkContext(APP_NAME);
        SparkSession sparkSession = SparkSession.builder()
            .sparkContext(javaSparkContext.sc()).getOrCreate();

        String fileLocationTest = "s3a://mybucket/WHOCovid19CitationsDatabase.csv";
        Dataset<Row> fullCitations =
            sparkSession.read().format("com.databricks.spark.csv").option("inferSchema", "true")
                .option("header", "true").load(fileLocationTest);
        fullCitations.show(10);

        // Selecting only the relevant columns for our exercise
        Dataset<Row> citations = fullCitations.select(
            col("Title"),
            col("Authors"),
            col("Abstract"),
            col("Published Year"),
            col("Journal"),
            col("Study"),
            col("Tags"));
        citations.show(10);

        // Removing citations with null title
        Dataset<Row> filteredCitations = citations.filter(col("Title").isNotNull());

        // Building a RDD composed of DynamoDB writable items that matches the Covid19Citation table
        JavaPairRDD<Text, DynamoDBItemWritable> dynamoCitations = filteredCitations.javaRDD().mapToPair(citation -> {
            Map<String, AttributeValue> attributes = new HashMap<>();
            putStringAttribute(attributes, "citationCode", UUID.randomUUID().toString());
            putStringAttribute(attributes, "title", citation.getAs("Title"));
            putStringAttribute(attributes, "authors", citation.getAs("Authors"));
            putStringAttribute(attributes, "abstract", citation.getAs("Abstract"));
            putNumberAttribute(attributes, "publishedYear", citation.getAs("Published Year"));
            putStringAttribute(attributes, "journal", citation.getAs("Journal"));
            putStringAttribute(attributes, "study", citation.getAs("Study"));
            putStringAttribute(attributes, "tags", citation.getAs("Tags"));

            DynamoDBItemWritable dynamoDBItemWritable = new DynamoDBItemWritable();
            dynamoDBItemWritable.setItem(attributes);
            return new Tuple2<>(new Text(""), dynamoDBItemWritable);
        });

        // Writing data to the DynamoDB table
        JobConf jobConf = SparkConfiguration.buildJobConf(javaSparkContext, Boolean.FALSE);
        dynamoCitations.saveAsHadoopDataset(jobConf);
        sparkSession.stop();
    }

    private static void putStringAttribute(Map<String, AttributeValue> attributes, String key, Object fieldValue) {
        if (fieldValue != null) {
            attributes.put(key, new AttributeValue(fieldValue.toString()));
        }
    }

    private static void putNumberAttribute(Map<String, AttributeValue> attributes, String key, Object fieldValue) {
        if (fieldValue != null) {
            try {
                Integer integerFieldValue = Integer.parseInt(fieldValue.toString());
                AttributeValue attributeValue = new AttributeValue();
                attributeValue.setN(integerFieldValue.toString());
                attributes.put(key, attributeValue);
            } catch (Exception e) {
                LOGGER.info("cannot convert " + fieldValue + " to integer");
            }
        }
    }
}

下面是构建Spark上下文的配置:

public static JavaSparkContext buildSparkContext(String application) throws ClassNotFoundException {
        SparkConf conf = new SparkConf()
                .setAppName(application)
                  .set("spark.executor.cores", "5")
                  .set("spark.executor.memory", "42G")
                  .set("spark.executor.memoryOverhead", "4G")
                  .set("spark.driver.memory", "42G")
                  .set("spark.driver.cores", "5")
                  .set("spark.executor.instances", "25")
                  .set("spark.default.parallelism", "250")
                  .set("spark.dynamicAllocation.enabled", "false")
                .registerKryoClasses(new Class<?>[]{
                        Class.forName("org.apache.hadoop.io.Text"),
                        Class.forName("org.apache.hadoop.dynamodb.DynamoDBItemWritable")
                });
        return new JavaSparkContext(conf);
    }

我正在使用带有m6a.16xlarge(64vcore,259GB内存)的EMR集群用于EC2示例类型。
有人能帮我一下吗?
先谢谢你了。

h7appiyu

h7appiyu1#

CSV模式推断是通过“读取整个文件并计算出每列的类型”来实现的。有一点取样,但实际上是一样的:1个或多个HTTP GET请求,在任何计算开始之前 * 流经整个文件 *
理想情况下,将CSV转换为具有良好定义模式的现代文件格式,例如Avro。如果您无法做到这一点,请在应用中显式声明csv文件的架构。
这可能不是您的问题的唯一原因,但它将是处理中的第一个瓶颈

jw5wzhpr

jw5wzhpr2#

如果对相同的数据进行迭代运算,则应使用persist()方法,以避免重复计算。
我建议在创建fullCitations数据集后至少将其持久化。
有关持久性的更多信息,请参阅文档:https://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence
另外,我会检查Spark配置。您有一个具有64个vcore和259GB RAM的群集,但您声明了25个executors,每个executors具有5个核心和42GB RAM。我不确定每个执行器是否需要这么多资源,但是如果集群没有足够的资源来启动所有执行器,可能会影响性能。

6uxekuva

6uxekuva3#

我终于找到了问题所在。在yarn log中,我看到这样的行:

23/06/14 11:26:28 WARN DynamoDBFibonacciRetryer: Retry: 1 Exception: com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputExceededException: The level of configured provisioned throughput for the table was exceeded. Consider increasing your provisioning level with the UpdateTable API. (Service: AmazonDynamoDBv2; Status Code: 400; Error Code: ProvisionedThroughputExceededException; Request ID: TM68B1L448VTTOIVEJ2G77Q4TJVV4KQNSO5AEMVJF66Q9ASUAAJG; Proxy: null)
23/06/14 11:26:29 WARN DynamoDBFibonacciRetryer: Retry: 1 Exception: com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputExceededException: The level of configured provisioned throughput for the table was exceeded. Consider increasing your provisioning level with the UpdateTable API. (Service: AmazonDynamoDBv2; Status Code: 400; Error Code: ProvisionedThroughputExceededException; Request ID: 2HRBEQ17BV6KHT7P4FDM4TGL3JVV4KQNSO5AEMVJF66Q9ASUAAJG; Proxy: null)
23/06/14 11:26:29 WARN DynamoDBFibonacciRetryer: Retry: 2 Exception: com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputExceededException: The level of configured provisioned throughput for the table was exceeded. Consider increasing your provisioning level with the UpdateTable API. (Service: AmazonDynamoDBv2; Status Code: 400; Error Code: ProvisionedThroughputExceededException; Request ID: D5FESCOIUVRQ764RL80IM8FEANVV4KQNSO5AEMVJF66Q9ASUAAJG; Proxy: null)
23/06/14 11:26:29 WARN DynamoDBFibonacciRetryer: Retry: 2 Exception: com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputExceededException: The level of configured provisioned throughput for the table was exceeded. Consider increasing your provisioning level with the UpdateTable API. (Service: AmazonDynamoDBv2; Status Code: 400; Error Code: ProvisionedThroughputExceededException; Request ID: 9TFNUKUGKM12B1VTIQSGRD46C7VV4KQNSO5AEMVJF66Q9ASUAAJG; Proxy: null)

结论是数据库是我的瓶颈。似乎我们超出了dynamodb表的吞吐量。为了解决这个问题,我需要更改dynamodb的预配配置。

相关问题