我正在使用spark流媒体接收我公司的内部数据源。我按照本教程编写了一个接收器:https://spark.apache.org/docs/latest/streaming-custom-receivers.html. 但在spark ui流媒体标签中,我总是看到0条消息。而且我在驱动程序日志中没有看到任何错误。真的很困惑出了什么问题(要连接到内部数据源,需要创建 client
,那么 listen()
我怀疑是不是因为 listen
数据源上的模式?
我的接收器
class MyReceiver(val clientId: String, val token: String, val env: String) extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) {
def onStart() {
new Thread("My Data Source") { override def run() { receive() } }.start()
}
def onStop() { }
private def receive() {
while(!isStopped()) {
try {
val client = new Client(clientId, token, "STAGE")
client.connect()
client.listen(Client.Topic, new ClientMsgHandler() {
override def process(event: ClientMsg): Unit = {
val msg: String = event.getBody
store(msg)
}
override def onException(event: ClientEvent): Unit = {
}
})
} catch {
case ce: java.net.ConnectException =>
System.out.println("Could not connect")
case t: Throwable =>
System.out.println("Error receiving data")
}
}
}
}
==================================================================
创建流
class MyStream(sc: SparkContext, sqlContext: SQLContext, cpDir: String) {
def creatingFunc(): StreamingContext = {
val ssc = new StreamingContext(sc, Seconds(3))
// Set the active SQLContext so that we can access it statically within the foreachRDD
SQLContext.setActiveSession(sqlContext)
ssc.checkpoint(cpDir)
val ClientId = <Myclientid>
val Token = <Mytoken>
val env = "STAGE"
val stream = ssc.receiverStream(new MyReceiver(ClientId, Token, env))
stream.foreachRDD { rdd => println("Here"+rdd.take(10).mkString(", "))
}
ssc
}
}
==================================================================
开始流式处理
val checkpoint_dir = <my_checkpoint_dir>
val MyDataSourceStream = new MyStream(sc, sqlContext, checkpoint_dir)
val ssc = StreamingContext.getActiveOrCreate(checkpoint_dir, MyDataSourceStream.creatingFunc _)
ssc.start()
ssc.awaitTermination()
更新:由于它是一个内部源代码,我不能共享客户端源代码。但我已经测试过了。它适用于以下代码和消息可以正确打印出来。你能想到 Client
作为没有连接问题的外部库。
val ClientId = <myclientid>
val Token = <mytoken>
val client = new EVClient(ClientId, Token, "STAGE")
client.connect()
client.listen(Client.Topic, new ClientMsgHandler() {
override def onEvent(event: ClientMsg): Unit = {
val res = event.getBody
println(res)
}
override def onException(event: ClientEvent): Unit = {
}
})
暂无答案!
目前还没有任何答案,快来回答吧!