我想创建一个读取http流的sourcefunction。我使用了scalaj,它做了我想做的事情(它将传入的文本拆分为\n-s)。很明显,代码在flink之外工作,但是每次作为flink作业启动它时,我都会得到一个nullpointerexception(有时在它传输1-2个元素后的1-2秒之后)。看起来http对象有点问题。
import org.apache.flink.streaming.api.functions.source.SourceFunction
import scala.io.Source.fromInputStream
import scalaj.http._
class HttpSource(url: String) extends SourceFunction[String] {
@volatile var isRunning = true
override def cancel(): Unit = isRunning = false
override def run(ctx: SourceFunction.SourceContext[String]): Unit =
httpStream(ctx.collect)
private def httpStream(f: String => Unit) = {
val request = Http(url)
request
.execute { inputStream =>
fromInputStream(inputStream)
.getLines()
.takeWhile(_ => isRunning)
.foreach(f)
}
}
}
我通常会得到一个例外:(有时会有点不同,例如我试图使请求值变为临时值,然后当它试图引用请求时,它已经为null)
Caused by: java.lang.NullPointerException
at java.io.Reader.<init>(Reader.java:78)
at java.io.InputStreamReader.<init>(InputStreamReader.java:129)
at scala.io.BufferedSource.reader(BufferedSource.scala:24)
at scala.io.BufferedSource.bufferedReader(BufferedSource.scala:25)
at scala.io.BufferedSource.scala$io$BufferedSource$$charReader$lzycompute(BufferedSource.scala:35)
at scala.io.BufferedSource.scala$io$BufferedSource$$charReader(BufferedSource.scala:33)
at scala.io.BufferedSource.scala$io$BufferedSource$$decachedReader(BufferedSource.scala:62)
at scala.io.BufferedSource$BufferedLineIterator.<init>(BufferedSource.scala:67)
at scala.io.BufferedSource.getLines(BufferedSource.scala:86)
at flinkextension.HttpSource$$anonfun$httpStream$1.apply(HttpSource.scala:21)
at flinkextension.HttpSource$$anonfun$httpStream$1.apply(HttpSource.scala:19)
at scalaj.http.HttpRequest$$anonfun$execute$1.apply(Http.scala:323)
at scalaj.http.HttpRequest$$anonfun$execute$1.apply(Http.scala:323)
at scalaj.http.HttpRequest$$anonfun$toResponse$3.apply(Http.scala:388)
at scalaj.http.HttpRequest$$anonfun$toResponse$3.apply(Http.scala:380)
at scala.Option.getOrElse(Option.scala:121)
at scalaj.http.HttpRequest.toResponse(Http.scala:380)
at scalaj.http.HttpRequest.scalaj$http$HttpRequest$$doConnection(Http.scala:360)
at scalaj.http.HttpRequest.exec(Http.scala:335)
at scalaj.http.HttpRequest.execute(Http.scala:323)
at flinkextension.HttpSource.httpStream(HttpSource.scala:19)
at flinkextension.HttpSource.run(HttpSource.scala:14)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:95)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:748)
当我不使用http请求时,其他的一切似乎都可以正常工作,但是使用相同的inputstream类型读取文件、使用字符串执行简单的while循环,甚至当我使用单个http请求时,这些请求都不是流式的。
我觉得我缺少了一些理论背景,也许flink在后台做了一些破坏http对象或inputstream的事情,但是我在文档中没有找到任何东西。
更新#1:
如果我在lambda中放入一个null检查,作业通常会立即退出,有时处理一些元素,有时在挂起一分钟后超时。以下是httpstream函数的这个版本:
private def httpStream(f: String => Unit) = {
val request = Http(url)
request
.execute { inputStream =>
if (inputStream == null) println("null inputstream")
else {
println("not null inputstream")
fromInputStream(inputStream)
.getLines()
.takeWhile(_ => isRunning)
.foreach(f)
}
}
}
更新#2:
该代码实际上在分布式模式下工作,并与streamexecutionenvironment.createlocalenvironment()配合使用
我只有在使用start-local.sh并将jar提交给它时才会遇到这个问题。
暂无答案!
目前还没有任何答案,快来回答吧!