在spark结构化流媒体上使用abris从模式注册表中写入具有模式的avro记录时出现问题

b4qexyjb  于 2021-05-27  发布在  Spark
关注(0)|答案(0)|浏览(301)

我正在尝试使用confluent schema registry中所有模式的一个kafka主题中的一些avro记录,进行一些转换,然后使用spark structured streaming在另一个主题中写入相同的记录。为了从这个avro主题中读取模式,我使用abris连接我的合流模式注册表-https://github.com/absaoss/abris
所以,现在,我可以正常地从dbserver1.inventory.customers主题中读取具有正确的键和值模式的所有数据。在我的代码中,我打印这个模式。但是,当我尝试在客户主题中写入此数据时,我的toïavro函数无法识别密钥和值模式,错误为error schemamanager:检索主题“teste-teste.key”的元数据时发现问题。遵循代码、错误和我的avro模式:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import za.co.absa.abris.avro.functions.from_confluent_avro
import za.co.absa.abris.avro.read.confluent.SchemaManager
import za.co.absa.abris.avro.functions.to_confluent_avro

object Testkeyy {

  val spark = SparkSession.builder()
    .appName("Our first streams")
    .master("local[2]")
    .getOrCreate()
  val schemaRegistryURL = "http://localhost:8081"
  val topic = "dbserver1.inventory.customers"

  val commonRegistryConfig = Map(
    SchemaManager.PARAM_SCHEMA_REGISTRY_TOPIC -> topic,
    SchemaManager.PARAM_SCHEMA_REGISTRY_URL -> schemaRegistryURL
  )

  val keyRegistryConfig = commonRegistryConfig ++ Map(
    SchemaManager.PARAM_KEY_SCHEMA_NAMING_STRATEGY -> "topic.record.name",
    SchemaManager.PARAM_KEY_SCHEMA_ID -> "latest",
    SchemaManager.PARAM_KEY_SCHEMA_NAME_FOR_RECORD_STRATEGY -> "key"

  )

  val valueRegistryConfig = commonRegistryConfig ++ Map(
    SchemaManager.PARAM_VALUE_SCHEMA_NAMING_STRATEGY -> "topic.name",
    SchemaManager.PARAM_VALUE_SCHEMA_ID -> "latest"
  )

  def readFromAvro() = {

    val df = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("subscribe", "dbserver1.inventory.customers")
      .option("startingOffsets", "earliest")
      .load()

    val data = df.select(
      from_confluent_avro(col("key"), keyRegistryConfig).as("key")
      ,from_confluent_avro(df.col("value"), valueRegistryConfig) as 'value)
    data.printSchema

    val config = Map(
      SchemaManager.PARAM_SCHEMA_REGISTRY_TOPIC -> "customers",
      SchemaManager.PARAM_SCHEMA_REGISTRY_URL -> "http://localhost:8081",
      SchemaManager.PARAM_KEY_SCHEMA_NAME_FOR_RECORD_STRATEGY -> "key",
      SchemaManager.PARAM_KEY_SCHEMA_NAMESPACE_FOR_RECORD_STRATEGY -> "customers"
    )

    val configkey = config ++ Map(
      SchemaManager.PARAM_KEY_SCHEMA_NAMING_STRATEGY -> "topic.record.name")
    //  SchemaManager.PARAM_KEY_SCHEMA_NAMING_STRATEGY -> SchemaManager.SchemaStorageNamingStrategies.TOPIC_NAME)

    val configvalue = config ++ Map(
      SchemaManager.PARAM_VALUE_SCHEMA_NAMING_STRATEGY -> "topic.name"
    )

    data.select(
      to_confluent_avro(col("key"), configkey).as("key"),
      to_confluent_avro(col("value"), configvalue).as("value"))
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("topic", "customers")
      .outputMode("append")
      .option("checkpointLocation", "checkpoints")// append and update not supported on aggregations without watermark
      .start()

错误:

ERROR SchemaManager: Problems found while retrieving metadata for subject 'customers-key.key'
io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Subject 'teste-teste.key' not found.; error code: 40401
    at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:230)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:256)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.getLatestVersion(RestService.java:515)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.getLatestVersion(RestService.java:507)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getLatestSchemaMetadata(CachedSchemaRegistryClient.java:275)
    at za.co.absa.abris.avro.read.confluent.SchemaManager.$anonfun$exists$1(SchemaManager.scala:123)
    at scala.util.Try$.apply(Try.scala:213)
    at za.co.absa.abris.avro.read.confluent.SchemaManager.exists(SchemaManager.scala:123)
    at za.co.absa.abris.avro.read.confluent.SchemaManager.register(SchemaManager.scala:110)
    at za.co.absa.abris.avro.sql.CatalystDataToAvro.registerSchema(CatalystDataToAvro.scala:73)
    at za.co.absa.abris.avro.sql.CatalystDataToAvro.$anonfun$schemaId$2(CatalystDataToAvro.scala:39)
    at scala.Option.orElse(Option.scala:447)
    at za.co.absa.abris.avro.sql.CatalystDataToAvro.$anonfun$schemaId$1(CatalystDataToAvro.scala:39)
    at scala.Option.flatMap(Option.scala:271)
    at za.co.absa.abris.avro.sql.CatalystDataToAvro.schemaId$lzycompute(CatalystDataToAvro.scala:37)
    at za.co.absa.abris.avro.sql.CatalystDataToAvro.schemaId(CatalystDataToAvro.scala:37)
    at za.co.absa.abris.avro.sql.CatalystDataToAvro.nullSafeEval(CatalystDataToAvro.scala:56)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
    at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run$7(WriteToDataSourceV2Exec.scala:438)
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1411)
    at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:477)
    at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:385)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:127)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
20/07/05 00:58:10 ERROR Utils: Aborting task
org.apache.avro.SchemaParseException: Can't redefine: teste.key
    at org.apache.avro.Schema$Names.put(Schema.java:1511)
    at org.apache.avro.Schema$NamedSchema.writeNameRef(Schema.java:782)
    at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:943)
    at org.apache.avro.Schema$UnionSchema.toJson(Schema.java:1203)
    at org.apache.avro.Schema$RecordSchema.fieldsToJson(Schema.java:971)
    at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:955)
    at org.apache.avro.Schema.toString(Schema.java:396)
    at org.apache.avro.Schema.toString(Schema.java:382)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:168)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:222)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:198)
    at za.co.absa.abris.avro.read.confluent.SchemaManager.register(SchemaManager.scala:112)
    at za.co.absa.abris.avro.sql.CatalystDataToAvro.registerSchema(CatalystDataToAvro.scala:73)
    at za.co.absa.abris.avro.sql.CatalystDataToAvro.$anonfun$schemaId$2(CatalystDataToAvro.scala:39)
    at scala.Option.orElse(Option.scala:447)
    at za.co.absa.abris.avro.sql.CatalystDataToAvro.$anonfun$schemaId$1(CatalystDataToAvro.scala:39)
    at scala.Option.flatMap(Option.scala:271)
    at za.co.absa.abris.avro.sql.CatalystDataToAvro.schemaId$lzycompute(CatalystDataToAvro.scala:37)
    at za.co.absa.abris.avro.sql.CatalystDataToAvro.schemaId(CatalystDataToAvro.scala:37)
    at za.co.absa.abris.avro.sql.CatalystDataToAvro.nullSafeEval(CatalystDataToAvro.scala:56)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
    at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run$7(WriteToDataSourceV2Exec.scala:438)
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1411)
    at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:477)
    at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:385)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:127)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
20/07/05 00:58:10 ERROR DataWritingSparkTask: Aborting commit for partition 0 (task 0, attempt 0, stage 0.0)
20/07/05 00:58:10 ERROR DataWritingSparkTask: Aborted commit for partition 0 (task 0, attempt 0, stage 0.0)
20/07/05 00:58:10 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
org.apache.avro.SchemaParseException: Can't redefine: teste.key
    at org.apache.avro.Schema$Names.put(Schema.java:1511)
    at org.apache.avro.Schema$NamedSchema.writeNameRef(Schema.java:782)
    at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:943)
    at org.apache.avro.Schema$UnionSchema.toJson(Schema.java:1203)
    at org.apache.avro.Schema$RecordSchema.fieldsToJson(Schema.java:971)
    at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:955)
    at org.apache.avro.Schema.toString(Schema.java:396)
    at org.apache.avro.Schema.toString(Schema.java:382)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:168)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:222)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:198)
    at za.co.absa.abris.avro.read.confluent.SchemaManager.register(SchemaManager.scala:112)
    at za.co.absa.abris.avro.sql.CatalystDataToAvro.registerSchema(CatalystDataToAvro.scala:73)
    at za.co.absa.abris.avro.sql.CatalystDataToAvro.$anonfun$schemaId$2(CatalystDataToAvro.scala:39)
    at scala.Option.orElse(Option.scala:447)
    at za.co.absa.abris.avro.sql.CatalystDataToAvro.$anonfun$schemaId$1(CatalystDataToAvro.scala:39)
    at scala.Option.flatMap(Option.scala:271)
    at za.co.absa.abris.avro.sql.CatalystDataToAvro.schemaId$lzycompute(CatalystDataToAvro.scala:37)
    at za.co.absa.abris.avro.sql.CatalystDataToAvro.schemaId(CatalystDataToAvro.scala:37)
    at za.co.absa.abris.avro.sql.CatalystDataToAvro.nullSafeEval(CatalystDataToAvro.scala:56)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
    at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run$7(WriteToDataSourceV2Exec.scala:438)
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1411)
    at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:477)
    at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:385)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:127)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

我的客户密钥和客户值avro模式(与dbserver1.inventory.customers的模式完全相同):

{
  "type": "record",
  "name": "Key",
  "namespace": "customers",
  "fields": [
    {
      "name": "id",
      "type": "int"
    }
  ],
  "connect.name": "customers.Key"
}

{
  "type": "record",
  "name": "Value",
  "namespace": "customers",
  "fields": [
    {
      "name": "id",
      "type": "int"
    },
    {
      "name": "first_name",
      "type": "string"
    },
    {
      "name": "last_name",
      "type": "string"
    },
    {
      "name": "email",
      "type": "string"
    }
  ],
  "connect.name": "customers.Value"
}

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题