spark自定义接收器无法获取数据

ni65a41a  于 2021-05-29  发布在  Spark
关注(0)|答案(0)|浏览(281)

我正在使用spark流媒体接收我公司的内部数据源。我按照本教程编写了一个接收器:https://spark.apache.org/docs/latest/streaming-custom-receivers.html. 但在spark ui流媒体标签中,我总是看到0条消息。而且我在驱动程序日志中没有看到任何错误。真的很困惑出了什么问题(要连接到内部数据源,需要创建 client ,那么 listen() 我怀疑是不是因为 listen 数据源上的模式?
我的接收器

class MyReceiver(val clientId: String, val token: String, val env: String) extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) {

  def onStart() {
    new Thread("My Data Source") { override def run() { receive() } }.start()
  }

  def onStop() {  }

  private def receive() {
    while(!isStopped()) {
      try {
        val client = new Client(clientId, token, "STAGE")
        client.connect()
        client.listen(Client.Topic, new ClientMsgHandler() {
          override def process(event: ClientMsg): Unit = {
            val msg: String = event.getBody
            store(msg)
          }

          override def onException(event: ClientEvent): Unit = {
          }
        })
      } catch {
        case ce: java.net.ConnectException =>
          System.out.println("Could not connect")
        case t: Throwable =>
          System.out.println("Error receiving data")
      }
    }
  }
}

==================================================================
创建流

class MyStream(sc: SparkContext, sqlContext: SQLContext, cpDir: String) {

  def creatingFunc(): StreamingContext = {

    val ssc = new StreamingContext(sc, Seconds(3))

    // Set the active SQLContext so that we can access it statically within the foreachRDD
    SQLContext.setActiveSession(sqlContext)

    ssc.checkpoint(cpDir)

    val ClientId = <Myclientid>
    val Token = <Mytoken>
    val env = "STAGE"
    val stream = ssc.receiverStream(new MyReceiver(ClientId, Token, env))

    stream.foreachRDD { rdd =>  println("Here"+rdd.take(10).mkString(", "))

    }
    ssc
  }
}

==================================================================
开始流式处理

val checkpoint_dir = <my_checkpoint_dir>
val MyDataSourceStream = new MyStream(sc, sqlContext, checkpoint_dir)
val ssc = StreamingContext.getActiveOrCreate(checkpoint_dir, MyDataSourceStream.creatingFunc _)
ssc.start()
ssc.awaitTermination()

更新:由于它是一个内部源代码,我不能共享客户端源代码。但我已经测试过了。它适用于以下代码和消息可以正确打印出来。你能想到 Client 作为没有连接问题的外部库。

val ClientId = <myclientid>
val Token = <mytoken>
val client = new EVClient(ClientId, Token, "STAGE")
client.connect()
client.listen(Client.Topic, new ClientMsgHandler() {
  override def onEvent(event: ClientMsg): Unit = {
    val res = event.getBody
    println(res)
  }
  override def onException(event: ClientEvent): Unit = {
  }
})

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题