sparkrdd:分组和过滤

ct2axkht  于 2021-07-14  发布在  Java
关注(0)|答案(1)|浏览(442)

我有一个对象的rdd“labresults”:

case class LabResult(patientID: String, date: Long, labName: String, value: String)

我想转换这个rdd,使得每个patientid和labname组合只包含一行。这一行应该是patientid和labname组合的最新一行(我只对患者进行此实验室的最新日期感兴趣)。我是这样做的:

//group rows by patient and lab and take only the last one
val cleanLab = labResults.groupBy(x => (x.patientID, x.labName)).map(_._2).map { events =>
  val latest_date = events.maxBy(_.date)
  val lab = events.filter(x=> x.date == latest_date)
  lab.take(1)
}

我想从这个rdd创建边:

val edgePatientLab: RDD[Edge[EdgeProperty]] = cleanLab
  .map({ lab =>
    Edge(lab.patientID.toLong, lab2VertexId(lab.labName), PatientLabEdgeProperty(lab).asInstanceOf[EdgeProperty])
  })

我得到一个错误:

value patientID is not a member of Iterable[edu.gatech.cse6250.model.LabResult]

[error]edge(lab.patientid.tolong,lab2vertexid(lab.labname),patientlabedgeproperty(lab.asinstanceof[edgeproperty])[error]^[error]/hw4/stu code/src/main/scala/edu/gatech/cse6250/graphconstruct/graphloader。scala:94:53:value labname不是iterable[edu.gatech.cse6250.model.labresult][error]edge(lab.patientid.tolong,lab2vertexid(lab.labname),patientlabedgeproperty(lab).asinstanceof[edgeproperty])[error]^[error]/hw4/stu code/src/main/scala/edu/gatech/cse6250/graphconstruct/graphloader。scala:94:86:类型不匹配[错误]找到:iterable[edu.gatech.cse6250.model.labresult][error]必需:edu.gatech.cse6250.model.labresult[error]edge(lab.patientid.tolong,lab2vertexid(lab.labname),patientlabedgeproperty(lab).asinstanceof[edgeproperty])
所以,问题似乎是“cleanlab”不是我预期的labresult的rdd,而是iterable的rdd[edu.gatech.cse6250.model.labresult]
我怎么能修好它?

nlejzf6q

nlejzf6q1#

这是我第一部分的方法。关于edge和其他课程的东西我帮不上忙,因为我不知道它们是从哪里来的(是从这里来的吗?)

scala> val ds = List(("1", 1, "A", "value 1"), ("1", 3, "A", "value 3"), ("1", 3, "B", "value 3"), ("1", 2, "A", "value 2"), ("1", 3, "B", "value 3"), ("1", 5, "B", "value 5") ).toDF("patientID", "date", "labName", "value").as[LabResult]
ds: org.apache.spark.sql.Dataset[LabResult] = [patientID: string, date: int ... 2 more fields]

scala> ds.show
+---------+----+-------+-------+
|patientID|date|labName|  value|
+---------+----+-------+-------+
|        1|   1|      A|value 1|
|        1|   3|      A|value 3|
|        1|   3|      B|value 3|
|        1|   2|      A|value 2|
|        1|   3|      B|value 3|
|        1|   5|      B|value 5|
+---------+----+-------+-------+

scala> val grouped = ds.groupBy("patientID", "labName").agg(max("date") as "date")
grouped: org.apache.spark.sql.DataFrame = [patientID: string, labName: string ... 1 more field]

scala> grouped.show
+---------+-------+----+
|patientID|labName|date|
+---------+-------+----+
|        1|      A|   3|
|        1|      B|   5|
+---------+-------+----+

scala> val cleanLab = ds.join(grouped, Seq("patientID", "labName", "date")).as[LabResult]
cleanLab: org.apache.spark.sql.Dataset[LabResult] = [patientID: string, labName: string ... 2 more fields]

scala> cleanLab.show
+---------+-------+----+-------+
|patientID|labName|date|  value|
+---------+-------+----+-------+
|        1|      A|   3|value 3|
|        1|      B|   5|value 5|
+---------+-------+----+-------+

scala> cleanLab.head
res45: LabResult = LabResult(1,3,A,value 3)

scala>

相关问题