在我们的应用程序中,我们使用mapgroupwithstate的结构化流,并结合kafka的读取。
启动应用程序后,在最初的批处理期间,如果我看到kafka lastprogress几乎每秒65k,则性能良好。几批之后,性能完全降低到每秒2000个左右。
在mapgroupwithstate函数中,基本上是对状态存储中的值进行更新和比较(下面提供了代码片段)。
Kafka偏移量-100000
启动应用程序后,在最初的批处理期间,如果我看到kafka lastprogress几乎每秒65k,则性能良好。几批之后,性能完全降低到每秒2000个左右。
如果我们看到其中一个executor的线程转储,那么除了sparkui的阻塞线程外,没有可疑的
其中一个执行者的gc统计如下
gc后没有发现太大的差异
代码段
case class MonitoringEvent(InternalID: String, monStartTimestamp: Timestamp, EndTimestamp: Timestamp, Stream: String, ParentID: Option[String])
val df = spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", Config.uatKafkaUrl)
.option("subscribe", Config.interBranchInputTopic)
.option("startingOffsets", "earliest")
.option("failOnDataLoss", "true")
.option("maxOffsetsPerTrigger", "100000")
.option("request.required.acks", "all")
.load()
.selectExpr("CAST(value AS STRING)")
val me: Dataset[MonitoringEvent] = df.select(from_json($"value", schema).as("data")).select($"data.*").as[MonitoringEvent]
val IB = me.groupByKey(x => (x.ParentID.getOrElse(x.InternalID)))
.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout)(IBTransformer.mappingFunctionIB _)
.flatMap(x => x)
val IBStream = IB
.select(to_json(struct($"*")).as("value"), $"InternalID".as("key"))
.writeStream
.format("kafka")
.queryName("InterBranch_Events_KafkaWriter")
.option("kafka.bootstrap.servers", Config.uatKafkaUrl)
.option("topic", Config.interBranchTopicComplete)
.option("checkpointLocation", Config.interBranchCheckPointDir)
.outputMode("update")
.start()
object IBTransformer extends Serializable {
case class IBStateStore(InternalID: String, monStartTimestamp: Timestamp)
def mappingFunctionIB(intrKey: String, intrValue: Iterator[MonitoringEvent], intrState: GroupState[IBStateStore]): Seq[MonitoringEvent] = {
try {
if (intrState.hasTimedOut) {
intrState.remove()
Seq.empty
} else {
val events = intrValue.toSeq
if (events.map(_.Status).contains(Started)) {
val tmp = events.filter(x => (x.Status == Started && x.InternalID == intrKey)).head
val toStore = IBStateStore(tmp.InternalID, tmp.monStartTimestamp)
intrState.update(toStore)
intrState.setTimeoutDuration(1200000)
}
val IB = events.filter(_.ParentID.isDefined)
if (intrState.exists && IB.nonEmpty) {
val startEvent = intrState.get
val IBUpdate = IB.map {x => x.copy(InternalID = startEvent.InternalID, monStartTimestamp = startEvent.monStartTimestamp) }
IBUpdate.foreach(id => intrState.update((IBStateStore(id.InternalID, id.monStartTimestamp)))) // updates the state with new IDs
IBUpdate
} else {
Seq.empty
}
}
}
catch
.
.
.
}
}
使用的执行程序数-8执行程序内存-8g驱动程序内存-8g
我在spark提交脚本中提供的java选项和内存
--executor-memory 8G \
--executor-cores 8 \
--num-executors 4 \
--driver-memory 8G \
--driver-java-options "-Dsun.security.krb5.debug=true -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -Duser.timezone=UTC -Dconfig.file=configIB.conf -Dlog4j.configuration=IBprocessor.log4j.properties" \
尝试在java选项中使用g1gc,但没有改进。我们持有的钥匙也小于提供的尺寸,所以不确定哪里出了问题。
有什么改进性能和消除gc问题的建议吗?
暂无答案!
目前还没有任何答案,快来回答吧!