我用spark来读Kafka的主题。
这是我的密码:
val df = spark.readStream.format("kafka").
option("kafka.bootstrap.servers", "myendpoint.servicebus.windows.net:9093").
option("kafka.security.protocol", "SASL_SSL").
option("subscribe", "twitter").
option("kafka.sasl.mechanism", "PLAIN").
option("startingOffsets", "earliest").
load()
val rawdata = df.selectExpr("CAST(value AS STRING)").as[String]
val query = rawdata.writeStream.outputMode("append").format("console").start()
query.awaitTermination()
错误如下:
java.util.concurrent.TimeoutException: Cannot fetch record for offset 446451 in 120000 milliseconds
at org.apache.spark.sql.kafka010.InternalKafkaConsumer.org$apache$spark$sql$kafka010$InternalKafkaConsumer$$fetchData(KafkaDataConsumer.scala:271)
at org.apache.spark.sql.kafka010.InternalKafkaConsumer$$anonfun$get$1.apply(KafkaDataConsumer.scala:163)
at org.apache.spark.sql.kafka010.InternalKafkaConsumer$$anonfun$get$1.apply(KafkaDataConsumer.scala:147)
at org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77)
at org.apache.spark.sql.kafka010.InternalKafkaConsumer.runUninterruptiblyIfPossible(KafkaDataConsumer.scala:109)
at org.apache.spark.sql.kafka010.InternalKafkaConsumer.get(KafkaDataConsumer.scala:147)
at org.apache.spark.sql.kafka010.KafkaDataConsumer$class.get(KafkaDataConsumer.scala:54)
at org.apache.spark.sql.kafka010.KafkaDataConsumer$CachedKafkaDataConsumer.get(KafkaDataConsumer.scala:362)
at org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:151)
at org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:142)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.foreach(WholeStageCodegenExec.scala:612)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2.scala:133)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2.scala:132)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1415)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2.scala:138)
at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$2.apply(WriteToDataSourceV2.scala:79)
at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$2.apply(WriteToDataSourceV2.scala:78)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
一些附加信息:我正在使用azurehdinsight spark环境,它使用kafka库连接到事件中心。
spark版本: 2.3.2.2.6.5.3026-7
hdinsight版本: 3.6.1000.67
eventhub分区计数: 8
Kafka图书馆版本:
org.apache.spark:spark-streaming-kafka-0-10_2.11:2.3.2,
org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.2,
org.apache.kafka:kafka-clients:0.10.2.1
暂无答案!
目前还没有任何答案,快来回答吧!