spark kafka流处理时间呈指数增长

kgqe7b3p  于 2021-05-29  发布在  Hadoop
关注(0)|答案(0)|浏览(238)

我有一份spark kafka的流媒体工作。下面是主要的作业处理逻辑。

val processedStream = rawStream.transform(x => {
var offsetRanges = x.asInstanceOf[HasOffsetRanges].offsetRanges;
val spark = SparkSession.builder.config(x.sparkContext.getConf).getOrCreate();
val parsedRDD = x.map(cr => cr.value());

var df = spark.sqlContext.read.schema(KafkaRawEvent.getStructure()).json(parsedRDD);

// Explode Events array as individual Event
if (DFUtils.hasColumn(df, "events")) {
    // Rename the dow and hour
    if (DFUtils.hasColumn(df, "dow"))
        df = df.withColumnRenamed("dow", "hit-dow");
    if (DFUtils.hasColumn(df, "hour"))
        df = df.withColumnRenamed("hour", "hit-hour");

    df = df
        .withColumn("event", explode(col("events")))
        .drop("events");

    if (DFUtils.hasColumn(df, "event.key")) {
        df = df.select(
            "*", "event.key",
            "event.count", "event.hour",
            "event.dow", "event.sum",
            "event.timestamp",
            "event.segmentation");
    }

    if (DFUtils.hasColumn(df, "key")) {
        df = df.filter("key != '[CLY]_view'");
    }

    df = df.select("*", "segmentation.*")
        .drop("segmentation")
        .drop("event");

    if (DFUtils.hasColumn(df, "metrics")) {
        df = df.select("*", "metrics.*").drop("metrics");
    }

    df = df.withColumnRenamed("timestamp", "eventTimeString");
    df = df.withColumn("eventtimestamp", df("eventTimeString").cast(LongType).divide(1000).cast(TimestampType).cast(DateType))
        .withColumn("date", current_date());

    if (DFUtils.hasColumn(df, "restID")) {
        df = df.join(broadcast(restroCached), df.col("restID") === restro.col("main_r_id"), "left_outer");
    }

    val SAVE_PATH = Conf.getSavePath();

    //Write dataframe to file
    df.write.partitionBy("date").mode("append").parquet(SAVE_PATH);

    // filter out app launch events and group by adId and push to kafka
    val columbiaDf = df.filter(col("adId").isNotNull)
        .select(col("adId")).distinct().toDF("cui").toJSON;

    // push columbia df to kafka for further processing
    columbiaDf.foreachPartition(partitionOfRecords => {
        val factory = columbiaProducerPool.value;
        val producer = factory.getOrCreateProducer();
        partitionOfRecords.foreach(record => {
            producer.send(record);
        });
    });

    df.toJSON
        .foreachPartition(partitionOfRecords => {
            val factory = producerPool.value;
            val producer = factory.getOrCreateProducer();
            partitionOfRecords.foreach(record => {
                producer.send(record);
            });
        });
}
rawStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges);
df.toJSON.rdd;
});

val windowOneHourEveryMinute = processedStream.window(Minutes(60), Seconds(60));

windowOneHourEveryMinute.foreachRDD(windowRDD => ({
val prefix = Conf.getAnalyticsPrefixesProperties().getProperty("view.all.last.3.hours");

val viewCount = spark.sqlContext.read.schema(ProcessedEvent.getStructure()).json(windowRDD)
    .filter("key == 'RestaurantView'")
    .groupBy("restID")
    .count()
    .rdd.map(r => (prefix + String.valueOf(r.get(0)), String.valueOf(r.get(1))));
spark.sparkContext.toRedisKV(viewCount, Conf.getRedisKeysTTL());
}));

streamingContext.start();
streamingContext.awaitTermination();

这个作业运行了将近一个月,没有一次失败。现在,虽然没有处理任何事件,但处理时间突然开始呈指数级增长。
我不知道为什么会这样。下面我附上应用程序主屏幕截图。


下面是处理时间的图表

从spark ui的“作业”选项卡。大部分时间都花在网上 .rdd.map(r => (prefix + String.valueOf(r.get(0)), String.valueOf(r.get(1))));
下面是舞台的dag

这只是dag的一小部分。实际的dag相当大,但是这些相同的任务在窗口中重复到RDD的总数,也就是说,我运行一个1分钟的批,所以30分钟的窗口有30个相同的重复任务。
有没有什么具体的原因使处理时间突然成指数级的开始?
spark版本:2.2.0 hadoop版本:2.7.3
注意:我在emr 5.8群集上运行此作业,1个驱动程序为2.5g,1个执行程序为3.5gb。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题