apachespark-delta-lake结构化流:空批处理:0导致空指针异常

w46czmvw  于 2021-05-17  发布在  Spark
关注(0)|答案(0)|浏览(244)

我正在尝试使用apachespark和apachebahir for mqtt读取mqtt流。我用一张三角表作为Flume。但是,当我启动程序时,它总是崩溃,并给出一个空指针异常。我注意到当程序启动时,第一批是空的。我怀疑这就是问题的原因,但我似乎找不到解决这个问题的方法。我使用的代码如下所示:

val topic = "temp"
val brokerUrl = "tcp://localhost:1883"
val lines = spark.readStream
  .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
  .option("topic", topic).option("persistence", "memory")
  .load(brokerUrl)
  .select("payload")
  .as[Array[Byte]]
  .map(payload => new String(payload))
  .toDF("payload")

val query = lines.writeStream
  .format("delta")
  .option("checkpointLocation", "delta/events/_checkpoints/etl-from-json")
  .start("hdfs://localhost:9000/user/admin/delta-table-localhost-02")

query.awaitTermination()

您可以看到下面的堆栈跟踪:

=== Streaming Query ===
Identifier: [id = 9d4b36f4-3d16-4689-a561-f2b00c55ca7f, runId = 2d6cf038-ea50-4186-8614-c59ab943e3e5]
Current Committed Offsets: {MQTTStreamSource[brokerUrl: tcp://localhost:1883, topic: temp clientId: paho428547487727]: -1}
Current Available Offsets: {MQTTStreamSource[brokerUrl: tcp://localhost:1883, topic: temp clientId: paho428547487727]: 1}

Current State: ACTIVE
Thread State: RUNNABLE

Logical Plan:
Project [value#14 AS payload#16]
+- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true, false) AS value#14]
   +- MapElements BahirStructuredStreaming$$$Lambda$931/277155095@5aa41140, class [B, [StructField(value,BinaryType,true)], obj#13: java.lang.String
      +- DeserializeToObject cast(payload#2 as binary), obj#12: binary
         +- Project [payload#2]
            +- StreamingExecutionRelation MQTTStreamSource[brokerUrl: tcp://localhost:1883, topic: temp clientId: paho428547487727], [id#0, topic#1, payload#2, timestamp#3]

    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:300)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
Caused by: java.lang.NullPointerException
    at org.apache.bahir.sql.streaming.mqtt.LocalMessageStore.get(MessageStore.scala:119)
    at org.apache.bahir.sql.streaming.mqtt.LocalMessageStore.retrieve(MessageStore.scala:143)
    at org.apache.bahir.sql.streaming.mqtt.MQTTStreamSource.$anonfun$planInputPartitions$2(MQTTStreamSource.scala:179)
    at scala.collection.MapLike.getOrElse(MapLike.scala:131)
    at scala.collection.MapLike.getOrElse$(MapLike.scala:129)
    at scala.collection.concurrent.TrieMap.getOrElse(TrieMap.scala:625)
    at org.apache.bahir.sql.streaming.mqtt.MQTTStreamSource.$anonfun$planInputPartitions$1(MQTTStreamSource.scala:179)
    at org.apache.bahir.sql.streaming.mqtt.MQTTStreamSource.$anonfun$planInputPartitions$1$adapted(MQTTStreamSource.scala:178)
    at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:285)
    at scala.collection.immutable.NumericRange.foreach(NumericRange.scala:74)
    at scala.collection.TraversableLike.map(TraversableLike.scala:285)
    at scala.collection.TraversableLike.map$(TraversableLike.scala:278)
    at scala.collection.AbstractTraversable.map(Traversable.scala:108)
    at org.apache.bahir.sql.streaming.mqtt.MQTTStreamSource.planInputPartitions(MQTTStreamSource.scala:178)
    at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec.partitions$lzycompute(DataSourceV2ScanExec.scala:76)
    at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec.partitions(DataSourceV2ScanExec.scala:75)
    at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec.outputPartitioning(DataSourceV2ScanExec.scala:65)
    at org.apache.spark.sql.execution.exchange.EnsureRequirements.$anonfun$ensureDistributionAndOrdering$1(EnsureRequirements.scala:150)
    at scala.collection.immutable.List.map(List.scala:293)
    at org.apache.spark.sql.execution.exchange.EnsureRequirements.org$apache$spark$sql$execution$exchange$EnsureRequirements$$ensureDistributionAndOrdering(EnsureRequirements.scala:149)
    at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$apply$1.applyOrElse(EnsureRequirements.scala:304)
    at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$apply$1.applyOrElse(EnsureRequirements.scala:296)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$2(TreeNode.scala:278)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:278)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$1(TreeNode.scala:275)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:326)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:324)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:275)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$1(TreeNode.scala:275)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:326)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:324)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:275)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$1(TreeNode.scala:275)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:326)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:324)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:275)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$1(TreeNode.scala:275)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:326)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:324)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:275)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$1(TreeNode.scala:275)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:326)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:324)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:275)
    at org.apache.spark.sql.execution.exchange.EnsureRequirements.apply(EnsureRequirements.scala:296)
    at org.apache.spark.sql.execution.exchange.EnsureRequirements.apply(EnsureRequirements.scala:38)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$prepareForExecution$1(QueryExecution.scala:87)
    at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
    at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
    at scala.collection.immutable.List.foldLeft(List.scala:91)
    at org.apache.spark.sql.execution.QueryExecution.prepareForExecution(QueryExecution.scala:87)
    at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:77)
    at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:77)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$13(MicroBatchExecution.scala:525)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:351)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:349)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:516)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:198)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:351)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:349)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:166)
    at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
    ... 1 more

Process finished with exit code 1

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题