我正在尝试使用apachespark和apachebahir for mqtt读取mqtt流。我用一张三角表作为Flume。但是,当我启动程序时,它总是崩溃,并给出一个空指针异常。我注意到当程序启动时,第一批是空的。我怀疑这就是问题的原因,但我似乎找不到解决这个问题的方法。我使用的代码如下所示:
val topic = "temp"
val brokerUrl = "tcp://localhost:1883"
val lines = spark.readStream
.format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
.option("topic", topic).option("persistence", "memory")
.load(brokerUrl)
.select("payload")
.as[Array[Byte]]
.map(payload => new String(payload))
.toDF("payload")
val query = lines.writeStream
.format("delta")
.option("checkpointLocation", "delta/events/_checkpoints/etl-from-json")
.start("hdfs://localhost:9000/user/admin/delta-table-localhost-02")
query.awaitTermination()
您可以看到下面的堆栈跟踪:
=== Streaming Query ===
Identifier: [id = 9d4b36f4-3d16-4689-a561-f2b00c55ca7f, runId = 2d6cf038-ea50-4186-8614-c59ab943e3e5]
Current Committed Offsets: {MQTTStreamSource[brokerUrl: tcp://localhost:1883, topic: temp clientId: paho428547487727]: -1}
Current Available Offsets: {MQTTStreamSource[brokerUrl: tcp://localhost:1883, topic: temp clientId: paho428547487727]: 1}
Current State: ACTIVE
Thread State: RUNNABLE
Logical Plan:
Project [value#14 AS payload#16]
+- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true, false) AS value#14]
+- MapElements BahirStructuredStreaming$$$Lambda$931/277155095@5aa41140, class [B, [StructField(value,BinaryType,true)], obj#13: java.lang.String
+- DeserializeToObject cast(payload#2 as binary), obj#12: binary
+- Project [payload#2]
+- StreamingExecutionRelation MQTTStreamSource[brokerUrl: tcp://localhost:1883, topic: temp clientId: paho428547487727], [id#0, topic#1, payload#2, timestamp#3]
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:300)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
Caused by: java.lang.NullPointerException
at org.apache.bahir.sql.streaming.mqtt.LocalMessageStore.get(MessageStore.scala:119)
at org.apache.bahir.sql.streaming.mqtt.LocalMessageStore.retrieve(MessageStore.scala:143)
at org.apache.bahir.sql.streaming.mqtt.MQTTStreamSource.$anonfun$planInputPartitions$2(MQTTStreamSource.scala:179)
at scala.collection.MapLike.getOrElse(MapLike.scala:131)
at scala.collection.MapLike.getOrElse$(MapLike.scala:129)
at scala.collection.concurrent.TrieMap.getOrElse(TrieMap.scala:625)
at org.apache.bahir.sql.streaming.mqtt.MQTTStreamSource.$anonfun$planInputPartitions$1(MQTTStreamSource.scala:179)
at org.apache.bahir.sql.streaming.mqtt.MQTTStreamSource.$anonfun$planInputPartitions$1$adapted(MQTTStreamSource.scala:178)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:285)
at scala.collection.immutable.NumericRange.foreach(NumericRange.scala:74)
at scala.collection.TraversableLike.map(TraversableLike.scala:285)
at scala.collection.TraversableLike.map$(TraversableLike.scala:278)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
at org.apache.bahir.sql.streaming.mqtt.MQTTStreamSource.planInputPartitions(MQTTStreamSource.scala:178)
at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec.partitions$lzycompute(DataSourceV2ScanExec.scala:76)
at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec.partitions(DataSourceV2ScanExec.scala:75)
at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec.outputPartitioning(DataSourceV2ScanExec.scala:65)
at org.apache.spark.sql.execution.exchange.EnsureRequirements.$anonfun$ensureDistributionAndOrdering$1(EnsureRequirements.scala:150)
at scala.collection.immutable.List.map(List.scala:293)
at org.apache.spark.sql.execution.exchange.EnsureRequirements.org$apache$spark$sql$execution$exchange$EnsureRequirements$$ensureDistributionAndOrdering(EnsureRequirements.scala:149)
at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$apply$1.applyOrElse(EnsureRequirements.scala:304)
at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$apply$1.applyOrElse(EnsureRequirements.scala:296)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$2(TreeNode.scala:278)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:278)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$1(TreeNode.scala:275)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:326)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:324)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:275)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$1(TreeNode.scala:275)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:326)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:324)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:275)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$1(TreeNode.scala:275)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:326)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:324)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:275)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$1(TreeNode.scala:275)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:326)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:324)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:275)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$1(TreeNode.scala:275)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:326)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:324)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:275)
at org.apache.spark.sql.execution.exchange.EnsureRequirements.apply(EnsureRequirements.scala:296)
at org.apache.spark.sql.execution.exchange.EnsureRequirements.apply(EnsureRequirements.scala:38)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$prepareForExecution$1(QueryExecution.scala:87)
at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
at scala.collection.immutable.List.foldLeft(List.scala:91)
at org.apache.spark.sql.execution.QueryExecution.prepareForExecution(QueryExecution.scala:87)
at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:77)
at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:77)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$13(MicroBatchExecution.scala:525)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:351)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:349)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:516)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:198)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:351)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:349)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:166)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
... 1 more
Process finished with exit code 1
暂无答案!
目前还没有任何答案,快来回答吧!