我试图运行一个spark-dstream作业,它试图从kafka读取数据,并用合流avro模式注册表反序列化它。我们在创建kafkautils.createdirectstream时配置了schema registry基本身份验证和传递配置
val kafkaConfig = Map("basic.auth.credentials.source" -> "URL","key.deserializer" -> classOf[KafkaAvroDeserializer], "value.deserializer" -> classOf[KafkaAvroDeserializer], "bootstrap.servers" -> "localhost:9092", "schema.registry.url" -> "http://username:password@localhost:8081")
val stream = KafkaUtils.createDirectStream[String, GenericData.Record](
ssc,
LocationStrategies.PreferConsistent,
Subscribe[String, GenericData.Record](topics, kafkaConfig)
)
当我尝试在intellij中运行此命令时,它的工作正常,但是当我尝试使用spark submit命令运行此命令时,我得到了schema registry unauthorized error。。?有人面临同样的问题吗?
错误跟踪:
21/02/21 16:03:42 ERROR Executor: Exception in task 0.0 in stage 5.0 (TID 8)
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition topic_name-2 at offset 110. If needed, please seek past the record to continue consumption.
Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema for id 161
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Unauthorized; error code: 401
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:230)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:256)
at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:486)
at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:479)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:177)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getBySubjectAndId(CachedSchemaRegistryClient.java:256)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getById(CachedSchemaRegistryClient.java:235)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:107)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:79)
at io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:55)
at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1261)
at org.apache.kafka.clients.consumer.internals.Fetcher.access$3600(Fetcher.java:124)
at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1488)
at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1600(Fetcher.java:1328)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:641)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:602)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1294)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1225)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1159)
at org.apache.spark.streaming.kafka010.InternalKafkaConsumer.poll(KafkaDataConsumer.scala:206)
at org.apache.spark.streaming.kafka010.InternalKafkaConsumer.get(KafkaDataConsumer.scala:135)
at org.apache.spark.streaming.kafka010.KafkaDataConsumer.get(KafkaDataConsumer.scala:39)
at org.apache.spark.streaming.kafka010.KafkaDataConsumer.get$(KafkaDataConsumer.scala:38)
at org.apache.spark.streaming.kafka010.KafkaDataConsumer$NonCachedKafkaDataConsumer.get(KafkaDataConsumer.scala:224)
at org.apache.spark.streaming.kafka010.KafkaRDDIterator.next(KafkaRDD.scala:257)
at org.apache.spark.streaming.kafka010.KafkaRDDIterator.next(KafkaRDD.scala:225)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:512)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:511)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:132)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:127)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
我正在使用spark 3.0和schema registry 5.3.1
暂无答案!
目前还没有任何答案,快来回答吧!