假设我有一个事件流,如下json,它被转换成case类
e.g.
{"id" : "IdOfUser" , "title": "Title1"}
to
case class UserEvent(id: Int, title: String)
.
我在mongodb中还有一个表,其中包含每个标题的元数据。它充当查找表,例如
val lookup_df = sparkSession.sqlContext.loadFromMongoDB(ReadConfig(Map("uri" -> "mongodb://000.000.000.000:27017/blog.TitleMetaData")))
我还使用mapgroupwithstate跟踪状态(https://databricks.com/blog/2017/10/17/arbitrary-stateful-processing-in-apache-sparks-structured-streaming.html)
我的有状态会话如下所示。
case class UserSession(var visited: collection.Map[String, Boolean],
var size:Int
)
我的主要问题似乎是,每当我试图在Map函数中使用一个操作时,lookuptable就会消失。当我将Dataframe作为对象中的全局变量读取时,会出现如下代码。这个代码给了我一个空的异常。
object StreamState{
val sparkSession = SparkSession.builder()
.master("local")
.appName("StreamState")
.getOrCreate()
val new_data = sparkSession.sqlContext.loadFromMongoDB(ReadConfig(Map("uri" -> "mongodb://000.000.000.000:27017/blog.TitleMetaData")))
}
此streamstate对象具有以下功能(updateuserstatewithevent在运行收集时引发错误)
错误发生在这里。def updateuserstatewithevent命令(statez:usersession, event:userevent):usersession={import sparksession.implicits.\uval current\u event=lookup\u df.filter($“title”==event.url)//此收集行给我一个错误val size=current\u event.select(“size”).as[int].collect()(0)val empty\u map=mapstring,boolean state.visited=empty\u map state.size=size state}
我已经把这个助手函数做了初步的设计,这样我们就不会被逻辑所束缚。state的size对象正被从一开始就读入的表(从mongodb)更改。这是我的Map函数。
def updateAcrossEvents(user:Int,
events: Iterator[UserEvent],
oldState: GroupState[UserSession]):UserSession = {
var state:UserSession = if (oldState.exists) {
println(oldState.get.visited)
oldState.get
}
else {
val empty_map = Map[String, Boolean]()
val empty_session = UserSession(empty_map, 0)
empty_session
}
import sparkSession.implicits._
for (event <- events) {
state = updateUserStateWithEvent(state, event)
oldState.update(state)
}
state
}
最后,这里是我的流状态对象的主要函数(这是从https://blog.yuvalitzchakov.com/exploring-stateful-streaming-with-spark-structured-streaming/)您可以在该博客中找到deserializeuserevent。
def main(args: Array[String]): Unit = {
sparkSession.sparkContext.setLogLevel("WARN")
import sparkSession.implicits._
val userEventsStream = sparkSession.readStream
.format("socket")
.option("host", "localhost")
.option("port", 12345)
.load()
.as[String]
val finishedUserSessionsStream: Dataset[UserSession] =
userEventsStream
.map(deserializeUserEvent)
.groupByKey(_.id)
.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
updateAcrossEvents)
finishedUserSessionsStream.writeStream
.outputMode(OutputMode.Update())
.format("console")
.option("checkpointLocation", "checkpoint")
.option("truncate",false)
.start()
.awaitTermination()
}
这是给我中止任务java.lang.nullpointerexception
我想知道为什么会发生这个错误,以及我如何修复它。它与我们的Map函数接受事件迭代器有关系吗,或者全局变量在流的上下文中不起作用吗?
以下是您可以从中获取的完整代码
import com.mongodb.spark.config.ReadConfig
import com.mongodb.spark.sql._
import org.apache.spark.sql.{DataFrame, Dataset, Encoder, Encoders, SparkSession}
import user.{UserEvent, UserSession,USession}
import scala.collection.{mutable, _}
import argonaut.Argonaut._
import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout, OutputMode}
import scala.collection.Map
object StreamStateThree {
val sparkSession = SparkSession.builder()
.master("local")
.appName("StreamStateThree")
//.config("spark.mongodb.input.uri", "mongodb://127.0.0.1/blog.articles")
//.config("spark.mongodb.output.uri", "mongodb://127.0.0.1/blog.vectors")
.getOrCreate()
sparkSession.sparkContext.setLogLevel("WARN")
import sparkSession.implicits._
val sc = sparkSession.sparkContext
val new_data = sparkSession.sqlContext.loadFromMongoDB(ReadConfig(Map("uri" -> "mongodb://000.000.000.000:27017/blog.cleanVectors")))
sparkSession.sparkContext.broadcast(new_data)
def updateUserStateWithEvent(statez:USession, event:UserEvent):USession = {
println("Updating")
val empty_map = Map[String, Boolean]()
val empty_rec: Array[String] = Array("")
val current_event = new_data.filter($"title" === event.url)
val size:Int = current_event.select("size").as[Int].collect()(0)
//.limit(1)
val empty_session = USession(empty_map,-7)
empty_session
}
def updateAcrossEvents(user:Int,
events: Iterator[UserEvent],
oldState: GroupState[USession]):USession = {
var state:USession = if (oldState.exists) {
println("State exists with the following visited")
oldState.get
}
else {
println("State does not exist")
val empty_map = Map[String, Boolean]()
val empty_session = USession(empty_map,-7)
empty_session
}
// we simply specify an old date that we can compare against and
// immediately update based on the values in our data
for (event <- events) {
state = updateUserStateWithEvent(state, event)
oldState.update(state)
}
state
}
def deserializeUserEvent(json: String): UserEvent = {
json.decodeEither[UserEvent] match {
case Right(userEvent) => userEvent
case Left(error) =>
println(s"Failed to parse user event: $error")
UserEvent.empty
}
}
def main(args: Array[String]): Unit = {
//new_data2.show(20,false)
val userEventsStream = sparkSession.readStream
.format("socket")
.option("host", "localhost")
.option("port", 12346)
.load()
.as[String]
val finishedUserSessionsStream =
userEventsStream
.map(deserializeUserEvent)
.groupByKey(_.id)
.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
updateAcrossEvents)
finishedUserSessionsStream.writeStream
.outputMode(OutputMode.Update())
.format("console")
.option("checkpointLocation", "checkpoint")
.option("truncate",false)
.start()
.awaitTermination()
}
}
下面是案例类供您使用
package user
import org.apache.spark.sql.Dataset
import scala.collection.Map
case class USession(var visited: collection.Map[String, Boolean],
var size : Int)
和事件
package user
import argonaut.Argonaut._
import argonaut.CodecJson
case class UserEvent(id: Int, url: String)
object UserEvent {
implicit def codec: CodecJson[UserEvent] =
casecodec2(UserEvent.apply, UserEvent.unapply)("id", "url")
lazy val empty = UserEvent(-1, "")
}
暂无答案!
目前还没有任何答案,快来回答吧!