如何在spark结构化流媒体中排名第一?

brvekthn  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(289)

我对spark流有一个问题(spark 2.2.1)。我正在开发一个实时管道,首先从kafka获取数据,然后将结果与另一个表连接,然后将Dataframe发送到als模型(spark-ml),然后它返回一个带有一个额外列predit的流Dataframe。问题是,当我试图获得最高分数的那一行时,我找不到解决它的方法。
我试过:
应用sql函数,如 Limit , Take ,
sort dense_rank() 功能
在stackoverflow中搜索
我阅读了不支持的操作,但似乎没有太多。
加上我将发送到Kafka队列的最高分数
我的代码如下:

val result = lines.selectExpr("CAST(value AS STRING)")
.select(from_json($"value", mySchema).as("data"))
//.select("data.*")
.selectExpr("cast(data.largo as int) as largo","cast(data.stock as int) as stock","data.verificavalormax","data.codbc","data.ide","data.timestamp_cli","data.tef_cli","data.nombre","data.descripcion","data.porcentaje","data.fechainicio","data.fechafin","data.descripcioncompleta","data.direccion","data.coordenadax","data.coordenaday","data.razon_social","data.segmento_app","data.categoria","data.subcategoria")
result.printSchema()

val model = ALSModel.load("ALSParaTiDos")

val fullPredictions = model.transform(result)

//fullPredictions is a streaming dataframe with a extra column "prediction", here i need the code to get the first row

val query = fullPredictions.writeStream.format("console").outputMode(OutputMode.Append()).option("truncate", "false").start()
   query.awaitTermination()

更新
可能我不太清楚,所以我附上了一张关于我的问题的图片。我还编写了一个更简单的代码来补充它:https://gist.github.com/.../9193c8a983c9007e8a1b6ec280d8df25 详细说明我需要什么。我将感谢您的帮助:)

2g32fytz

2g32fytz1#

热释光;dr使用流内部连接(spark 2.3.0)或使用 memory Flume(或Hive桌)用于临时存储。
我认为下面这句话很好地描述了你的情况:
问题是,当我试图获得最高分数的那一行时,我找不到解决它的方法。
撇开机器学习不谈,因为它提供了一个具有预测的流式数据集,所以在流式数据集中,重点是在列中找到最大值才是真正的情况。
第一步是按如下方式计算最大值(直接从代码中复制):

streaming.groupBy("idCustomer").agg(max("score") as "maxscore")

在spark 2.3.0(几天前发布)之后,您就可以加入两个流数据集:
在spark 2.3中,我们增加了对流连接的支持,即可以连接两个流数据集/Dataframe。
支持任何类型的列上的内部联接以及任何类型的联接条件。
加入流式数据集,就完成了。

yeotifhr

yeotifhr2#

试试这个:
实现一个函数,提取列的最大值,然后用最大值过滤Dataframe

def getDataFrameMaxRow(df:DataFrame , col:String):DataFrame = {
 // get the maximum value
 val list_prediction = df.select(col).toJSON.rdd
         .collect()
         .toList
         .map { x =>  gson.fromJson[JsonObject](x, classOf[JsonObject])}
         .map { x => x.get(col).getAsString.toInt}
 val max = getMaxFromList(list_prediction)

 // filter dataframe by the maximum value
 val df_filtered = df.filter(df(col) === max.toString())

 return df_filtered
}

def getMaxFromList(xs: List[Int]): Int = xs match {
 case List(x: Int) => x
 case x :: y :: rest => getMaxFromList( (if (x > y) x else y) :: rest )
}

在代码正文中添加:

import com.google.gson.JsonObject
import com.google.gson.Gson
import org.apache.spark.sql.DataFrame

val fullPredictions = model.transform(result)
val df_with_row_max = getDataFrameMaxRow(fullPredictions, "prediction")

祝你好运!!

相关问题