nullpointerexception

vuktfyat  于 2021-06-25  发布在  Flink
关注(0)|答案(0)|浏览(284)

我想创建一个读取http流的sourcefunction。我使用了scalaj,它做了我想做的事情(它将传入的文本拆分为\n-s)。很明显,代码在flink之外工作,但是每次作为flink作业启动它时,我都会得到一个nullpointerexception(有时在它传输1-2个元素后的1-2秒之后)。看起来http对象有点问题。

import org.apache.flink.streaming.api.functions.source.SourceFunction

import scala.io.Source.fromInputStream
import scalaj.http._

class HttpSource(url: String) extends SourceFunction[String] {

  @volatile var isRunning = true
  override def cancel(): Unit = isRunning = false

  override def run(ctx: SourceFunction.SourceContext[String]): Unit =
    httpStream(ctx.collect)

  private def httpStream(f: String => Unit) = {
    val request = Http(url)
    request
      .execute { inputStream =>
        fromInputStream(inputStream)
          .getLines()
          .takeWhile(_ => isRunning)
          .foreach(f)
      }
  }
}

我通常会得到一个例外:(有时会有点不同,例如我试图使请求值变为临时值,然后当它试图引用请求时,它已经为null)

Caused by: java.lang.NullPointerException
    at java.io.Reader.<init>(Reader.java:78)
    at java.io.InputStreamReader.<init>(InputStreamReader.java:129)
    at scala.io.BufferedSource.reader(BufferedSource.scala:24)
    at scala.io.BufferedSource.bufferedReader(BufferedSource.scala:25)
    at scala.io.BufferedSource.scala$io$BufferedSource$$charReader$lzycompute(BufferedSource.scala:35)
    at scala.io.BufferedSource.scala$io$BufferedSource$$charReader(BufferedSource.scala:33)
    at scala.io.BufferedSource.scala$io$BufferedSource$$decachedReader(BufferedSource.scala:62)
    at scala.io.BufferedSource$BufferedLineIterator.<init>(BufferedSource.scala:67)
    at scala.io.BufferedSource.getLines(BufferedSource.scala:86)
    at flinkextension.HttpSource$$anonfun$httpStream$1.apply(HttpSource.scala:21)
    at flinkextension.HttpSource$$anonfun$httpStream$1.apply(HttpSource.scala:19)
    at scalaj.http.HttpRequest$$anonfun$execute$1.apply(Http.scala:323)
    at scalaj.http.HttpRequest$$anonfun$execute$1.apply(Http.scala:323)
    at scalaj.http.HttpRequest$$anonfun$toResponse$3.apply(Http.scala:388)
    at scalaj.http.HttpRequest$$anonfun$toResponse$3.apply(Http.scala:380)
    at scala.Option.getOrElse(Option.scala:121)
    at scalaj.http.HttpRequest.toResponse(Http.scala:380)
    at scalaj.http.HttpRequest.scalaj$http$HttpRequest$$doConnection(Http.scala:360)
    at scalaj.http.HttpRequest.exec(Http.scala:335)
    at scalaj.http.HttpRequest.execute(Http.scala:323)
    at flinkextension.HttpSource.httpStream(HttpSource.scala:19)
    at flinkextension.HttpSource.run(HttpSource.scala:14)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:95)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
    at java.lang.Thread.run(Thread.java:748)

当我不使用http请求时,其他的一切似乎都可以正常工作,但是使用相同的inputstream类型读取文件、使用字符串执行简单的while循环,甚至当我使用单个http请求时,这些请求都不是流式的。
我觉得我缺少了一些理论背景,也许flink在后台做了一些破坏http对象或inputstream的事情,但是我在文档中没有找到任何东西。
更新#1:
如果我在lambda中放入一个null检查,作业通常会立即退出,有时处理一些元素,有时在挂起一分钟后超时。以下是httpstream函数的这个版本:

private def httpStream(f: String => Unit) = {
    val request = Http(url)
    request
      .execute { inputStream =>
        if (inputStream == null) println("null inputstream")
        else {
          println("not null inputstream")
          fromInputStream(inputStream)
            .getLines()
            .takeWhile(_ => isRunning)
            .foreach(f)
        }
      }
  }

更新#2:
该代码实际上在分布式模式下工作,并与streamexecutionenvironment.createlocalenvironment()配合使用
我只有在使用start-local.sh并将jar提交给它时才会遇到这个问题。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题