我正在尝试使用confluent schema registry中所有模式的一个kafka主题中的一些avro记录,进行一些转换,然后使用spark structured streaming在另一个主题中写入相同的记录。为了从这个avro主题中读取模式,我使用abris连接我的合流模式注册表-https://github.com/absaoss/abris
所以,现在,我可以正常地从dbserver1.inventory.customers主题中读取具有正确的键和值模式的所有数据。在我的代码中,我打印这个模式。但是,当我尝试在客户主题中写入此数据时,我的toïavro函数无法识别密钥和值模式,错误为error schemamanager:检索主题“teste-teste.key”的元数据时发现问题。遵循代码、错误和我的avro模式:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import za.co.absa.abris.avro.functions.from_confluent_avro
import za.co.absa.abris.avro.read.confluent.SchemaManager
import za.co.absa.abris.avro.functions.to_confluent_avro
object Testkeyy {
val spark = SparkSession.builder()
.appName("Our first streams")
.master("local[2]")
.getOrCreate()
val schemaRegistryURL = "http://localhost:8081"
val topic = "dbserver1.inventory.customers"
val commonRegistryConfig = Map(
SchemaManager.PARAM_SCHEMA_REGISTRY_TOPIC -> topic,
SchemaManager.PARAM_SCHEMA_REGISTRY_URL -> schemaRegistryURL
)
val keyRegistryConfig = commonRegistryConfig ++ Map(
SchemaManager.PARAM_KEY_SCHEMA_NAMING_STRATEGY -> "topic.record.name",
SchemaManager.PARAM_KEY_SCHEMA_ID -> "latest",
SchemaManager.PARAM_KEY_SCHEMA_NAME_FOR_RECORD_STRATEGY -> "key"
)
val valueRegistryConfig = commonRegistryConfig ++ Map(
SchemaManager.PARAM_VALUE_SCHEMA_NAMING_STRATEGY -> "topic.name",
SchemaManager.PARAM_VALUE_SCHEMA_ID -> "latest"
)
def readFromAvro() = {
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "dbserver1.inventory.customers")
.option("startingOffsets", "earliest")
.load()
val data = df.select(
from_confluent_avro(col("key"), keyRegistryConfig).as("key")
,from_confluent_avro(df.col("value"), valueRegistryConfig) as 'value)
data.printSchema
val config = Map(
SchemaManager.PARAM_SCHEMA_REGISTRY_TOPIC -> "customers",
SchemaManager.PARAM_SCHEMA_REGISTRY_URL -> "http://localhost:8081",
SchemaManager.PARAM_KEY_SCHEMA_NAME_FOR_RECORD_STRATEGY -> "key",
SchemaManager.PARAM_KEY_SCHEMA_NAMESPACE_FOR_RECORD_STRATEGY -> "customers"
)
val configkey = config ++ Map(
SchemaManager.PARAM_KEY_SCHEMA_NAMING_STRATEGY -> "topic.record.name")
// SchemaManager.PARAM_KEY_SCHEMA_NAMING_STRATEGY -> SchemaManager.SchemaStorageNamingStrategies.TOPIC_NAME)
val configvalue = config ++ Map(
SchemaManager.PARAM_VALUE_SCHEMA_NAMING_STRATEGY -> "topic.name"
)
data.select(
to_confluent_avro(col("key"), configkey).as("key"),
to_confluent_avro(col("value"), configvalue).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "customers")
.outputMode("append")
.option("checkpointLocation", "checkpoints")// append and update not supported on aggregations without watermark
.start()
错误:
ERROR SchemaManager: Problems found while retrieving metadata for subject 'customers-key.key'
io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Subject 'teste-teste.key' not found.; error code: 40401
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:230)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:256)
at io.confluent.kafka.schemaregistry.client.rest.RestService.getLatestVersion(RestService.java:515)
at io.confluent.kafka.schemaregistry.client.rest.RestService.getLatestVersion(RestService.java:507)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getLatestSchemaMetadata(CachedSchemaRegistryClient.java:275)
at za.co.absa.abris.avro.read.confluent.SchemaManager.$anonfun$exists$1(SchemaManager.scala:123)
at scala.util.Try$.apply(Try.scala:213)
at za.co.absa.abris.avro.read.confluent.SchemaManager.exists(SchemaManager.scala:123)
at za.co.absa.abris.avro.read.confluent.SchemaManager.register(SchemaManager.scala:110)
at za.co.absa.abris.avro.sql.CatalystDataToAvro.registerSchema(CatalystDataToAvro.scala:73)
at za.co.absa.abris.avro.sql.CatalystDataToAvro.$anonfun$schemaId$2(CatalystDataToAvro.scala:39)
at scala.Option.orElse(Option.scala:447)
at za.co.absa.abris.avro.sql.CatalystDataToAvro.$anonfun$schemaId$1(CatalystDataToAvro.scala:39)
at scala.Option.flatMap(Option.scala:271)
at za.co.absa.abris.avro.sql.CatalystDataToAvro.schemaId$lzycompute(CatalystDataToAvro.scala:37)
at za.co.absa.abris.avro.sql.CatalystDataToAvro.schemaId(CatalystDataToAvro.scala:37)
at za.co.absa.abris.avro.sql.CatalystDataToAvro.nullSafeEval(CatalystDataToAvro.scala:56)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run$7(WriteToDataSourceV2Exec.scala:438)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1411)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:477)
at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:385)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:127)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
20/07/05 00:58:10 ERROR Utils: Aborting task
org.apache.avro.SchemaParseException: Can't redefine: teste.key
at org.apache.avro.Schema$Names.put(Schema.java:1511)
at org.apache.avro.Schema$NamedSchema.writeNameRef(Schema.java:782)
at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:943)
at org.apache.avro.Schema$UnionSchema.toJson(Schema.java:1203)
at org.apache.avro.Schema$RecordSchema.fieldsToJson(Schema.java:971)
at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:955)
at org.apache.avro.Schema.toString(Schema.java:396)
at org.apache.avro.Schema.toString(Schema.java:382)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:168)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:222)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:198)
at za.co.absa.abris.avro.read.confluent.SchemaManager.register(SchemaManager.scala:112)
at za.co.absa.abris.avro.sql.CatalystDataToAvro.registerSchema(CatalystDataToAvro.scala:73)
at za.co.absa.abris.avro.sql.CatalystDataToAvro.$anonfun$schemaId$2(CatalystDataToAvro.scala:39)
at scala.Option.orElse(Option.scala:447)
at za.co.absa.abris.avro.sql.CatalystDataToAvro.$anonfun$schemaId$1(CatalystDataToAvro.scala:39)
at scala.Option.flatMap(Option.scala:271)
at za.co.absa.abris.avro.sql.CatalystDataToAvro.schemaId$lzycompute(CatalystDataToAvro.scala:37)
at za.co.absa.abris.avro.sql.CatalystDataToAvro.schemaId(CatalystDataToAvro.scala:37)
at za.co.absa.abris.avro.sql.CatalystDataToAvro.nullSafeEval(CatalystDataToAvro.scala:56)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run$7(WriteToDataSourceV2Exec.scala:438)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1411)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:477)
at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:385)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:127)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
20/07/05 00:58:10 ERROR DataWritingSparkTask: Aborting commit for partition 0 (task 0, attempt 0, stage 0.0)
20/07/05 00:58:10 ERROR DataWritingSparkTask: Aborted commit for partition 0 (task 0, attempt 0, stage 0.0)
20/07/05 00:58:10 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
org.apache.avro.SchemaParseException: Can't redefine: teste.key
at org.apache.avro.Schema$Names.put(Schema.java:1511)
at org.apache.avro.Schema$NamedSchema.writeNameRef(Schema.java:782)
at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:943)
at org.apache.avro.Schema$UnionSchema.toJson(Schema.java:1203)
at org.apache.avro.Schema$RecordSchema.fieldsToJson(Schema.java:971)
at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:955)
at org.apache.avro.Schema.toString(Schema.java:396)
at org.apache.avro.Schema.toString(Schema.java:382)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:168)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:222)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:198)
at za.co.absa.abris.avro.read.confluent.SchemaManager.register(SchemaManager.scala:112)
at za.co.absa.abris.avro.sql.CatalystDataToAvro.registerSchema(CatalystDataToAvro.scala:73)
at za.co.absa.abris.avro.sql.CatalystDataToAvro.$anonfun$schemaId$2(CatalystDataToAvro.scala:39)
at scala.Option.orElse(Option.scala:447)
at za.co.absa.abris.avro.sql.CatalystDataToAvro.$anonfun$schemaId$1(CatalystDataToAvro.scala:39)
at scala.Option.flatMap(Option.scala:271)
at za.co.absa.abris.avro.sql.CatalystDataToAvro.schemaId$lzycompute(CatalystDataToAvro.scala:37)
at za.co.absa.abris.avro.sql.CatalystDataToAvro.schemaId(CatalystDataToAvro.scala:37)
at za.co.absa.abris.avro.sql.CatalystDataToAvro.nullSafeEval(CatalystDataToAvro.scala:56)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run$7(WriteToDataSourceV2Exec.scala:438)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1411)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:477)
at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:385)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:127)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
我的客户密钥和客户值avro模式(与dbserver1.inventory.customers的模式完全相同):
{
"type": "record",
"name": "Key",
"namespace": "customers",
"fields": [
{
"name": "id",
"type": "int"
}
],
"connect.name": "customers.Key"
}
{
"type": "record",
"name": "Value",
"namespace": "customers",
"fields": [
{
"name": "id",
"type": "int"
},
{
"name": "first_name",
"type": "string"
},
{
"name": "last_name",
"type": "string"
},
{
"name": "email",
"type": "string"
}
],
"connect.name": "customers.Value"
}
暂无答案!
目前还没有任何答案,快来回答吧!