我有一份spark kafka的流媒体工作。下面是主要的作业处理逻辑。
val processedStream = rawStream.transform(x => {
var offsetRanges = x.asInstanceOf[HasOffsetRanges].offsetRanges;
val spark = SparkSession.builder.config(x.sparkContext.getConf).getOrCreate();
val parsedRDD = x.map(cr => cr.value());
var df = spark.sqlContext.read.schema(KafkaRawEvent.getStructure()).json(parsedRDD);
// Explode Events array as individual Event
if (DFUtils.hasColumn(df, "events")) {
// Rename the dow and hour
if (DFUtils.hasColumn(df, "dow"))
df = df.withColumnRenamed("dow", "hit-dow");
if (DFUtils.hasColumn(df, "hour"))
df = df.withColumnRenamed("hour", "hit-hour");
df = df
.withColumn("event", explode(col("events")))
.drop("events");
if (DFUtils.hasColumn(df, "event.key")) {
df = df.select(
"*", "event.key",
"event.count", "event.hour",
"event.dow", "event.sum",
"event.timestamp",
"event.segmentation");
}
if (DFUtils.hasColumn(df, "key")) {
df = df.filter("key != '[CLY]_view'");
}
df = df.select("*", "segmentation.*")
.drop("segmentation")
.drop("event");
if (DFUtils.hasColumn(df, "metrics")) {
df = df.select("*", "metrics.*").drop("metrics");
}
df = df.withColumnRenamed("timestamp", "eventTimeString");
df = df.withColumn("eventtimestamp", df("eventTimeString").cast(LongType).divide(1000).cast(TimestampType).cast(DateType))
.withColumn("date", current_date());
if (DFUtils.hasColumn(df, "restID")) {
df = df.join(broadcast(restroCached), df.col("restID") === restro.col("main_r_id"), "left_outer");
}
val SAVE_PATH = Conf.getSavePath();
//Write dataframe to file
df.write.partitionBy("date").mode("append").parquet(SAVE_PATH);
// filter out app launch events and group by adId and push to kafka
val columbiaDf = df.filter(col("adId").isNotNull)
.select(col("adId")).distinct().toDF("cui").toJSON;
// push columbia df to kafka for further processing
columbiaDf.foreachPartition(partitionOfRecords => {
val factory = columbiaProducerPool.value;
val producer = factory.getOrCreateProducer();
partitionOfRecords.foreach(record => {
producer.send(record);
});
});
df.toJSON
.foreachPartition(partitionOfRecords => {
val factory = producerPool.value;
val producer = factory.getOrCreateProducer();
partitionOfRecords.foreach(record => {
producer.send(record);
});
});
}
rawStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges);
df.toJSON.rdd;
});
val windowOneHourEveryMinute = processedStream.window(Minutes(60), Seconds(60));
windowOneHourEveryMinute.foreachRDD(windowRDD => ({
val prefix = Conf.getAnalyticsPrefixesProperties().getProperty("view.all.last.3.hours");
val viewCount = spark.sqlContext.read.schema(ProcessedEvent.getStructure()).json(windowRDD)
.filter("key == 'RestaurantView'")
.groupBy("restID")
.count()
.rdd.map(r => (prefix + String.valueOf(r.get(0)), String.valueOf(r.get(1))));
spark.sparkContext.toRedisKV(viewCount, Conf.getRedisKeysTTL());
}));
streamingContext.start();
streamingContext.awaitTermination();
这个作业运行了将近一个月,没有一次失败。现在,虽然没有处理任何事件,但处理时间突然开始呈指数级增长。
我不知道为什么会这样。下面我附上应用程序主屏幕截图。
下面是处理时间的图表
从spark ui的“作业”选项卡。大部分时间都花在网上 .rdd.map(r => (prefix + String.valueOf(r.get(0)), String.valueOf(r.get(1))));
下面是舞台的dag
这只是dag的一小部分。实际的dag相当大,但是这些相同的任务在窗口中重复到RDD的总数,也就是说,我运行一个1分钟的批,所以30分钟的窗口有30个相同的重复任务。
有没有什么具体的原因使处理时间突然成指数级的开始?
spark版本:2.2.0 hadoop版本:2.7.3
注意:我在emr 5.8群集上运行此作业,1个驱动程序为2.5g,1个执行程序为3.5gb。
暂无答案!
目前还没有任何答案,快来回答吧!