如何使用带有from\ avro标准函数的合流模式注册表?

y1aodyip  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(358)

这个问题在这里已经有答案了

集成spark结构化流与融合模式注册表(7个答案)
去年关门了。
我的kafka和schema注册表是基于confluent community platform 5.2.2的,而我的spark版本是2.4.4。我开始使用spark repl env时:

./bin/spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.4,org.apache.spark:spark-avro_2.11:2.4.4

并为spark会话设置kafka源:

val brokerServers = "my_confluent_server:9092"
val topicName = "my_kafka_topic_name" 
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", brokerServers)
  .option("subscribe", topicName)
  .load()

我得到了关于键和值的模式信息:

import io.confluent.kafka.schemaregistry.client.rest.RestService
val schemaRegistryURL = "http://my_confluent_server:8081"
val restService = new RestService(schemaRegistryURL)
val keyRestResponseSchemaStr: String = restService.getLatestVersionSchemaOnly(topicName + "-key")
val valueRestResponseSchemaStr: String = restService.getLatestVersionSchemaOnly(topicName + "-value")

首先,如果我用writestream查询“key”,即。

import org.apache.spark.sql.avro._
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.DataFrame
import java.time.LocalDateTime
val query = df.writeStream
  .outputMode("append")
  .foreachBatch((batchDF: DataFrame, batchId: Long) => {
    val rstDF = batchDF
      .select(
        from_avro($"key", keyRestResponseSchemaStr).as("key"),
        from_avro($"value", valueRestResponseSchemaStr).as("value"))

    println(s"${LocalDateTime.now} --- Batch ${batchId}, ${batchDF.count} rows")
    //rstDF.select("value").show
    rstDF.select("key").show
  })
  .trigger(Trigger.ProcessingTime("120 seconds"))
  .start()

query.awaitTermination()

没有错误,甚至显示了行数,但是我没有得到任何数据。

2019-09-16T10:30:16.984 --- Batch 0, 0 rows
+---+
|key|
+---+
+---+

2019-09-16T10:32:00.401 --- Batch 1, 27 rows
+---+
|key|
+---+
| []|
| []|
| []|
| []|
| []|
| []|
| []|
| []|
| []|
| []|
| []|
| []|
| []|
| []|
| []|
| []|
| []|
| []|
| []|
| []|
+---+
only showing top 20 rows

但如果我选择“值”:

import org.apache.spark.sql.avro._
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.DataFrame
import java.time.LocalDateTime
val query = df.writeStream
  .outputMode("append")
  .foreachBatch((batchDF: DataFrame, batchId: Long) => {
    val rstDF = batchDF
      .select(
        from_avro($"key", keyRestResponseSchemaStr).as("key"),
        from_avro($"value", valueRestResponseSchemaStr).as("value"))

    println(s"${LocalDateTime.now} --- Batch ${batchId}, ${batchDF.count} rows")
    rstDF.select("value").show
    //rstDF.select("key").show
  })
  .trigger(Trigger.ProcessingTime("120 seconds"))
  .start()

query.awaitTermination()

我收到消息:

2019-09-16T10:34:54.287 --- Batch 0, 0 rows
+-----+
|value|
+-----+
+-----+

2019-09-16T10:36:00.416 --- Batch 1, 19 rows
19/09/16 10:36:03 ERROR Executor: Exception in task 0.0 in stage 4.0 (TID 3)
org.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -1
    at org.apache.avro.io.BinaryDecoder.doReadBytes(BinaryDecoder.java:336)
    at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:263)
    at org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:201)
    at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:422)
    at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:414)
    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:181)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
    at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:232)
    at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
    at org.apache.spark.sql.avro.AvroDataToCatalyst.nullSafeEval(AvroDataToCatalyst.scala:50)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.serializefromobject_doConsume_0$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:123)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

所以我认为问题有两个层次:
首先,avro对key和value的反序列化逻辑不同,当前的from\ u avro只支持key,不支持value
即使对于key,也没有错误,但是“from\ u avro”的反序列化程序无法获得真实的数据。
你认为我有什么错误的步骤吗?或者,是否需要增强从\u avro到\u avro的连接?
谢谢。

9o685dep

9o685dep1#

您的键和值完全是字节数组,它们的id以整数值作为前缀。spark avro不支持这种格式,只支持包含作为记录一部分的模式的“avro容器对象”格式。
换句话说,您需要从合流反序列化程序(而不是“纯avro”反序列化程序)调用函数,以便首先获取avro对象,然后可以在这些对象上放置模式
Spark应该从_avro增强到_avro?
他们应该,但不会。参考spark-26314。请注意,databricks确实提供了与同名函数的模式注册表集成,这只会增加混淆
解决方法是使用这个库-https://github.com/absaoss/abris
或者在集成spark结构化流与合流模式注册中心时可以看到其他解决方案

相关问题