pyspark java.util.concurrent.ExecutionException:从Databricks中的EventHub阅读事件时出现com.microsoft.azure.eventhubs.EventHubException错误

xghobddn  于 2023-10-15  发布在  Spark
关注(0)|答案(1)|浏览(98)

使用结构化流pyspark中记录的步骤,我能够创建连接,但无法读取数据。
错误代码:

Stream stopped...
java.util.concurrent.ExecutionException: com.microsoft.azure.eventhubs.EventHubException
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
    at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
    at org.apache.spark.eventhubs.client.EventHubsClient.liftedTree1$1(EventHubsClient.scala:187)
    at org.apache.spark.eventhubs.client.EventHubsClient.partitionCountLazyVal$lzycompute(EventHubsClient.scala:184)
    at org.apache.spark.eventhubs.client.EventHubsClient.partitionCountLazyVal(EventHubsClient.scala:183)
    at org.apache.spark.eventhubs.client.EventHubsClient.partitionCount(EventHubsClient.scala:176)
    at org.apache.spark.sql.eventhubs.EventHubsSource.partitionCount(EventHubsSource.scala:81)
    at org.apache.spark.sql.eventhubs.EventHubsSource.$anonfun$maxOffsetsPerTrigger$4(EventHubsSource.scala:96)
    at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.sql.eventhubs.EventHubsSource.$anonfun$maxOffsetsPerTrigger$2(EventHubsSource.scala:96)
    at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.sql.eventhubs.EventHubsSource.<init>(EventHubsSource.scala:96)
    at org.apache.spark.sql.eventhubs.EventHubsSourceProvider.createSource(EventHubsSourceProvider.scala:84)
    at org.apache.spark.sql.execution.datasources.DataSource.createSource(DataSource.scala:326)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.$anonfun$applyOrElse$1(MicroBatchExecution.scala:100)
    at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:86)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:97)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:95)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:484)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:86)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:484)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:262)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:258)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$3(TreeNode.scala:489)
    at org.apache.spark.sql.catalyst.plans.logical.UnaryLikeLogicalPlan.mapChildren(LogicalPlan.scala:197)
    at org.apache.spark.sql.catalyst.plans.logical.UnaryLikeLogicalPlan.mapChildren$(LogicalPlan.scala:196)
    at org.apache.spark.sql.catalyst.plans.logical.UnaryNode.mapChildren(LogicalPlan.scala:224)
    at org.apache.spark.sql.catalyst.plans.logical.UnaryNode.mapChildren(LogicalPlan.scala:224)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:489)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:262)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:258)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:460)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:428)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.planQuery(MicroBatchExecution.scala:95)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan$lzycompute(MicroBatchExecution.scala:165)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan(MicroBatchExecution.scala:165)
    at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:349)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:852)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:341)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:268)
Caused by: com.microsoft.azure.eventhubs.EventHubException
    at com.microsoft.azure.eventhubs.impl.ExceptionUtil.toException(ExceptionUtil.java:74)
    at com.microsoft.azure.eventhubs.impl.ReceiveLinkHandler.onLinkFinal(ReceiveLinkHandler.java:81)
    at org.apache.qpid.proton.engine.BaseHandler.handle(BaseHandler.java:182)
    at org.apache.qpid.proton.engine.impl.EventImpl.dispatch(EventImpl.java:108)
    at org.apache.qpid.proton.reactor.impl.ReactorImpl.dispatch(ReactorImpl.java:324)
    at org.apache.qpid.proton.reactor.impl.ReactorImpl.process(ReactorImpl.java:291)
    at com.microsoft.azure.eventhubs.impl.MessagingFactory$RunReactor.run(MessagingFactory.java:784)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

代码:

from pyspark.sql.types import *
import  pyspark.sql.functions as F

# Event Hub Namespace Name
NAMESPACE_NAME = "<>"
KEY_NAME = "<>"
KEY_VALUE = "<>"

# The connection string to your Event Hubs Namespace
connectionString = "Endpoint=sb://{0}.servicebus.windows.net/;SharedAccessKeyName={1};SharedAccessKey={2};EntityPath=testing".format(NAMESPACE_NAME, KEY_NAME, KEY_VALUE)

ehConf = {}
# ehConf['eventhubs.connectionString'] = connectionString

# For 2.3.15 version and above, the configuration dictionary requires that connection string be encrypted.
ehConf['eventhubs.connectionString'] = sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(connectionString)

read_df  = spark.readStream.format("eventhubs").options(**ehConf).load()

# defining the schema
read_schema = StructType([
  StructField("Color", StringType(), True),
  StructField("Value", StringType(), True)]
)
#capturating in dataframe
decoded_df = read_df.select(F.from_json(F.col("body").cast("string"), read_schema).alias("payload"))

#To view the stream

query1=decoded_df.writeStream.format("json").option("path","/user/sh/JsonStream/").option("checkpointLocation","/user/sh/EventHubCheckPoint/").queryName("read_hub").start()

请帮助我哪里出错了
注意:我已经添加了com.microsoft.azure:azure-eventhubs-spark_2.11:2.3.20,并使用DBR 8.4和spark spark 3.1.2和scala 2.12

6ojccjat

6ojccjat1#

这很可能是由不正确的依赖关系引起的-对于DBR 7.x和8.x,您需要使用azure-eventhubs-spark_2.12而不是azure-eventhubs-spark_2.11,尽管错误有点奇怪-通常不正确的Scala版本会导致其他错误。

相关问题