kafka schema registry&spark连续流集成,异常:org.apache.avro.schema类的循环引用

knpiaxh1  于 2021-05-24  发布在  Spark
关注(0)|答案(0)|浏览(315)

我试图应用Spark连续流使用Kafka作为一个项目的读源使用模式注册表。我的Kafka信息 SpecificAvroRecord [其中有一个schema字段(org.apache.avro.schema)]。
据我所知,spark不支持自引用。那么,将kafka模式注册表[特定记录]与spark连续流媒体集成的最佳方法是什么?
我的测试代码:

SparkSession spark = SparkSession.builder().appName("testpro")
        .master("local[2]").getOrCreate();

    Dataset<Row> df = spark.readStream()
        .format("kafka").option("kafka.bootstrap.servers", "192.168.68.1:9092,192.168.204.1:9092")
        .option("subscribe", "testTopic")
        .option("startingOffsets", "latest").load();

    Dataset<Data> messages = df.selectExpr("CAST(value AS STRING)").map(value -> new Data(value.toString()), Encoders.bean(Data.class));

消息类

public class Data extends SpecificAvroRecord {

  private static final long serialVersionUID = 1L;
  private String firstName;
  private String lastName;
  public String name;

  public Data() {}
   ........setters and getters and string constructore

具体记录:

public abstract class SpecificAvroRecord extends org.apache.avro.specific.SpecificRecordBase {
  protected Schema schema;
}

例外情况:

Exception in thread "main" java.lang.UnsupportedOperationException: Cannot have circular references in bean class, but got the circular reference of class class org.apache.avro.Schema
    at org.apache.spark.sql.catalyst.JavaTypeInference$.org$apache$spark$sql$catalyst$JavaTypeInference$$inferDataType(JavaTypeInference.scala:126)
    at org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$1.apply(JavaTypeInference.scala:136)
    at org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$1.apply(JavaTypeInference.scala:134)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
    at org.apache.spark.sql.catalyst.JavaTypeInference$.org$apache$spark$sql$catalyst$JavaTypeInference$$inferDataType(JavaTypeInference.scala:134)
    at org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$1.apply(JavaTypeInference.scala:136)
    at org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$1.apply(JavaTypeInference.scala:134)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
    at org.apache.spark.sql.catalyst.JavaTypeInference$.org$apache$spark$sql$catalyst$JavaTypeInference$$inferDataType(JavaTypeInference.scala:134)
    at org.apache.spark.sql.catalyst.JavaTypeInference$.inferDataType(JavaTypeInference.scala:55)
    at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.javaBean(ExpressionEncoder.scala:86)
    at org.apache.spark.sql.Encoders$.bean(Encoders.scala:142)
    at org.apache.spark.sql.Encoders.bean(Encoders.scala)
    at com.eventumsolutions.nms.spark.services.analyzer.app.contStr.main(contStr.java:50)
    ```

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题