我是一个新手在Spark,特别是在斯卡拉,所以任何帮助是非常感谢。我有一个case类x的iterable,它是状态更新函数的一个参数,其中状态由一个tuple2和一个字符串以及case类y定义。
val updateState = (id: String, Xs: Option[Iterable[X]], state: State[Tuple2[String,Y]]) {
...
}
,其中案例类x和y的定义如下:
case class X(Elem1: String, Elem2: int, Elem3: Date, Elem4: Double, Elem5: String, Elem6: String)
case class Y(Elem1: String, Elem2: Double, Elem3: Double, Elem4: Double, Elem5: Int)
假设case类y将存储一些操作的结果值,这些操作必须在属于同一id的所有x类的属性值中按元素应用;为了更好地描述它,让我用一个dstream格式的例子:
val DStream = (Id1,ArrayBuffer(X(Id1,intA,date1,double1,xxxxx,yyyy)))
(Id2,ArrayBuffer(X(Id2,intB,date2,double2,xxxxx,yyyy),
X(Id2,intC,date2,double2,xxxxx,yyyy),
X(Id2,intC,date2,double2,xxxxx,yyyy),
X(Id2,intD,date2,double3,xxxxx,yyyy)))
(Id3,ArrayBuffer(X(Id3,intD,date3,double4,xxxxx,yyyy),
X(Id3,intE,date3,double5,xxxxx,yyyy)))
...
考虑到这一点,我想要实现的是对其中一些值进行计算,并为每个id创建一个y类,如下所示:
Y(id, avg(X(Elem4)), min(X(Elem4)), max(X(Elem4)), sum(X(Elem2)), count_Xs_for_ID)
我们的想法是最终通过使用mapwithstate将此函数应用于数据流(我碰巧知道比updateStateKey性能更好):
val stateDStream = DStream.mapWithState(StateSpec.function(updateState))
我见过应用于2个数组的方法“zip”,但我觉得它不适合这个目标;也许一个应用于xs参数的Map,使用一个应用于每个x类的函数,可以做到这一点,但是我有点迷路了,也许对于一个示例主题,我变得越来越复杂了,谁能给我一些提示或者正确地指导我实现这个目标?
谢谢你的时间,jl
暂无答案!
目前还没有任何答案,快来回答吧!