我试图应用Spark连续流使用Kafka作为一个项目的读源使用模式注册表。我的Kafka信息 SpecificAvroRecord
[其中有一个schema字段(org.apache.avro.schema)]。
据我所知,spark不支持自引用。那么,将kafka模式注册表[特定记录]与spark连续流媒体集成的最佳方法是什么?
我的测试代码:
SparkSession spark = SparkSession.builder().appName("testpro")
.master("local[2]").getOrCreate();
Dataset<Row> df = spark.readStream()
.format("kafka").option("kafka.bootstrap.servers", "192.168.68.1:9092,192.168.204.1:9092")
.option("subscribe", "testTopic")
.option("startingOffsets", "latest").load();
Dataset<Data> messages = df.selectExpr("CAST(value AS STRING)").map(value -> new Data(value.toString()), Encoders.bean(Data.class));
消息类
public class Data extends SpecificAvroRecord {
private static final long serialVersionUID = 1L;
private String firstName;
private String lastName;
public String name;
public Data() {}
........setters and getters and string constructore
具体记录:
public abstract class SpecificAvroRecord extends org.apache.avro.specific.SpecificRecordBase {
protected Schema schema;
}
例外情况:
Exception in thread "main" java.lang.UnsupportedOperationException: Cannot have circular references in bean class, but got the circular reference of class class org.apache.avro.Schema
at org.apache.spark.sql.catalyst.JavaTypeInference$.org$apache$spark$sql$catalyst$JavaTypeInference$$inferDataType(JavaTypeInference.scala:126)
at org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$1.apply(JavaTypeInference.scala:136)
at org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$1.apply(JavaTypeInference.scala:134)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
at org.apache.spark.sql.catalyst.JavaTypeInference$.org$apache$spark$sql$catalyst$JavaTypeInference$$inferDataType(JavaTypeInference.scala:134)
at org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$1.apply(JavaTypeInference.scala:136)
at org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$1.apply(JavaTypeInference.scala:134)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
at org.apache.spark.sql.catalyst.JavaTypeInference$.org$apache$spark$sql$catalyst$JavaTypeInference$$inferDataType(JavaTypeInference.scala:134)
at org.apache.spark.sql.catalyst.JavaTypeInference$.inferDataType(JavaTypeInference.scala:55)
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.javaBean(ExpressionEncoder.scala:86)
at org.apache.spark.sql.Encoders$.bean(Encoders.scala:142)
at org.apache.spark.sql.Encoders.bean(Encoders.scala)
at com.eventumsolutions.nms.spark.services.analyzer.app.contStr.main(contStr.java:50)
```
暂无答案!
目前还没有任何答案,快来回答吧!