我对spark流有一个问题(spark 2.2.1)。我正在开发一个实时管道,首先从kafka获取数据,然后将结果与另一个表连接,然后将Dataframe发送到als模型(spark-ml),然后它返回一个带有一个额外列predit的流Dataframe。问题是,当我试图获得最高分数的那一行时,我找不到解决它的方法。
我试过:
应用sql函数,如 Limit
, Take
,
sort dense_rank()
功能
在stackoverflow中搜索
我阅读了不支持的操作,但似乎没有太多。
加上我将发送到Kafka队列的最高分数
我的代码如下:
val result = lines.selectExpr("CAST(value AS STRING)")
.select(from_json($"value", mySchema).as("data"))
//.select("data.*")
.selectExpr("cast(data.largo as int) as largo","cast(data.stock as int) as stock","data.verificavalormax","data.codbc","data.ide","data.timestamp_cli","data.tef_cli","data.nombre","data.descripcion","data.porcentaje","data.fechainicio","data.fechafin","data.descripcioncompleta","data.direccion","data.coordenadax","data.coordenaday","data.razon_social","data.segmento_app","data.categoria","data.subcategoria")
result.printSchema()
val model = ALSModel.load("ALSParaTiDos")
val fullPredictions = model.transform(result)
//fullPredictions is a streaming dataframe with a extra column "prediction", here i need the code to get the first row
val query = fullPredictions.writeStream.format("console").outputMode(OutputMode.Append()).option("truncate", "false").start()
query.awaitTermination()
更新
可能我不太清楚,所以我附上了一张关于我的问题的图片。我还编写了一个更简单的代码来补充它:https://gist.github.com/.../9193c8a983c9007e8a1b6ec280d8df25 详细说明我需要什么。我将感谢您的帮助:)
2条答案
按热度按时间2g32fytz1#
热释光;dr使用流内部连接(spark 2.3.0)或使用
memory
Flume(或Hive桌)用于临时存储。我认为下面这句话很好地描述了你的情况:
问题是,当我试图获得最高分数的那一行时,我找不到解决它的方法。
撇开机器学习不谈,因为它提供了一个具有预测的流式数据集,所以在流式数据集中,重点是在列中找到最大值才是真正的情况。
第一步是按如下方式计算最大值(直接从代码中复制):
在spark 2.3.0(几天前发布)之后,您就可以加入两个流数据集:
在spark 2.3中,我们增加了对流连接的支持,即可以连接两个流数据集/Dataframe。
支持任何类型的列上的内部联接以及任何类型的联接条件。
加入流式数据集,就完成了。
yeotifhr2#
试试这个:
实现一个函数,提取列的最大值,然后用最大值过滤Dataframe
在代码正文中添加:
祝你好运!!