我正在尝试使用mapgroupswithstate方法对传入的数据流进行有状态结构化流处理。但我面临的问题是,我为groupbykey选择的键使我的状态变得太大太快。最明显的解决方法是更改密钥,但我希望在update方法中应用的业务逻辑要求密钥与我现在的密钥完全相同,或者如果可能的话,访问所有密钥的groupstate。
例如,我有一个来自不同组织的数据流,通常一个组织包含userid、personid等。请参见下面的代码:
val stream: Dataset[User] = dataFrame.as[User]
val noTimeout = GroupStateTimeout.NoTimeout
val statisticStream = stream
.groupByKey(key => key.orgId)
.mapGroupsWithState(noTimeout)(updateUserStatistic)
val df = statisticStream.toDF()
val query = df
.writeStream
.outputMode(Update())
.option("checkpointLocation", s"$checkpointLocation/$name")
.foreach(new UserCountWriter(spark.sparkContext.getConf))
.outputMode(Update())
.queryName(name)
.trigger(Trigger.ProcessingTime(Duration.apply("10 seconds")))
案例类别:
case class User(
orgId: Long,
profileId: Long,
userId: Long)
case class UserStatistic(
orgId: Long,
known: Long,
uknown: Long,
userSeq: Seq[User])
更新方法:
def updateUserStatistic(
orgId: Long,
newEvents: Iterator[User],
oldState: GroupState[UserStatistic]): UserStatistic = {
var state: UserStatistic = if (oldState.exists) oldState.get else UserStatistic(orgId, 0L, 0L, Seq.empty)
for (event <- newEvents) {
//business logic like checking if userId in this organization is of certain type and then accordingly update the known or unknown attribute for that particular user.
oldState.update(state)
state
}
当我必须在driver-executor模型上执行此操作时,问题会变得更糟,因为我预计每个组织都有100万到1000万用户,这可能意味着单个executor上有这么多状态(如果我理解错误,请纠正我)
失败的可能解决方案:
按键分组作为用户id-因为这样我就无法获得给定组织id的所有用户id,因为这些GroupState放在聚合键、值对中,这里是用户id。因此,对于每个新的userid,都会创建一个新的状态,即使它属于同一个组织。
如有任何帮助或建议,我们将不胜感激。
1条答案
按热度按时间j8ag8udp1#
您的状态的大小一直在增加,因为在当前的实现中,不会从groupstate中删除任何键/状态对。
为了准确地缓解您所面临的问题(无限增长状态)
mapGroupsWithState
方法允许您使用超时。您可以选择两种类型的超时:使用处理超时
GroupStateTimeout.ProcessingTimeTimeout
与GroupState.setTimeoutDuration()
,或事件超时使用
GroupStateTimeout.EventTimeTimeout
与GroupState.setTimeoutTimestamp()
.注意它们之间的区别是基于持续时间的超时和更灵活的基于时间的超时。
在特征的尺度上
GroupState
您将找到一个关于如何在Map函数中使用超时的好模板: