flink-avroruntimeexception:不是一个特定的类

yquaqz18  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(472)

我尝试在一个交互式flink scala shell中使用合流模式注册表来开始使用flink的当前0.10.1版本。这里有更多的上下文https://github.com/geoheil/streaming-reference/tree/5-basic-flink-setup
我的问题是尝试从 ConfluentRegistryAvroDeserializationSchema 失败:

val serializer = ConfluentRegistryAvroDeserializationSchema.forSpecific[Tweet](classOf[Tweet], schemaRegistryUrl)
error: type arguments [Tweet] conform to the bounds of none of the overloaded alternatives of
value forSpecific: [T <: org.apache.avro.specific.SpecificRecord](x$1: Class[T], x$2: String, x$3: Int)org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema[T] <and> [T <: org.apache.avro.specific.SpecificRecord](x$1: Class[T], x$2: String)org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema[T]

shell的设置如下所示(即,为avro或schema registry支持添加的附加jar如下所示:):

wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka_2.11/1.10.1/flink-connector-kafka_2.11-1.10.1.jar -P lib/
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka-base_2.11/1.10.1/flink-connector-kafka-base_2.11-1.10.1.jar -P lib/
wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/0.10.2.1/kafka-clients-0.10.2.1.jar -P lib/
wget https://repo1.maven.org/maven2/org/apache/flink/flink-avro-confluent-registry/1.10.1/flink-avro-confluent-registry-1.10.1.jar -P lib/
wget https://repo1.maven.org/maven2/org/apache/flink/flink-avro/1.10.1/flink-avro-1.10.1.jar -P lib/
wget https://repo1.maven.org/maven2/org/apache/flink/force-shading/1.10.1/force-shading-1.10.1.jar -P lib/
wget https://repo1.maven.org/maven2/org/apache/avro/avro/1.8.2/avro-1.8.2.jar -P lib/

export TERM=xterm-color
./bin/start-scala-shell.sh local

我尝试执行以下代码段:

import org.apache.flink.streaming.connectors.kafka.{
  FlinkKafkaConsumer,
  FlinkKafkaProducer
}
import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema
import java.util.Properties

senv.enableCheckpointing(5000)

final case class Tweet(tweet_id: Option[String], text: Option[String], source: Option[String], geo: Option[String], place: Option[String], lang: Option[String], created_at: Option[String], timestamp_ms: Option[String], coordinates: Option[String], user_id: Option[Long], user_name: Option[String], screen_name: Option[String], user_created_at: Option[String], followers_count: Option[Long], friends_count: Option[Long], user_lang: Option[String], user_location: Option[String], hashtags: Option[Seq[String]])

val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("group.id", "test")
val schemaRegistryUrl = "http://localhost:8081"
val serializer = ConfluentRegistryAvroDeserializationSchema.forSpecific[Tweet](classOf[Tweet], schemaRegistryUrl)

http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/no-subject-td36269.html 以及http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/avro-from-avrohugger-still-invalid-td36274.html 是flinks邮件列表的链接

编辑

我发现的第一个提示是:https://github.com/zladovan/gradle-avrohugger-plugin 在生成类时,我需要将case类更改为avro specfic或generic record之一。但我也在努力让它工作。
这个 case class Tweet 上面的例子是从https://github.com/geoheil/streaming-reference/blob/5-basic-flink-setup/common/models/src/main/avro/tweet.avsc 使用https://github.com/zladovan/gradle-avrohugger-plugin 在标准(即案例类)模式下。
然而,它需要转移到 SpecificRecord 格式https://github.com/zladovan/gradle-avrohugger-plugin#source-格式有一个 Tweet 兼容的类。这个相当长。为完整起见,请访问https://gist.github.com/geoheil/8b15d44d07e11c32a461b78365e0c158
这项工作现在仍然失败:

Caused by: org.apache.avro.AvroRuntimeException: Not a Specific class: class com.github.geoheil.streamingreference.Tweet

即使是一个可以说是兼容的类。所以这还不是一个完整的解决方案。即使根据https://issues.apache.org/jira/browse/flink-12501 它应该已经起作用了:

org.apache.avro.AvroRuntimeException: avro.shaded.com.google.common.util.concurrent.UncheckedExecutionException: org.apache.avro.AvroRuntimeException: Not a Specific class: class com.github.geoheil.streamingreference.Tweet
    at org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:227)
    at org.apache.flink.formats.avro.AvroDeserializationSchema.checkAvroInitialized(AvroDeserializationSchema.java:147)
    at org.apache.flink.formats.avro.RegistryAvroDeserializationSchema.checkAvroInitialized(RegistryAvroDeserializationSchema.java:79)
    at org.apache.flink.formats.avro.RegistryAvroDeserializationSchema.deserialize(RegistryAvroDeserializationSchema.java:64)
    at org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:45)
    at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:140)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:718)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:200)
Caused by: avro.shaded.com.google.common.util.concurrent.UncheckedExecutionException: org.apache.avro.AvroRuntimeException: Not a Specific class: class com.github.geoheil.streamingreference.Tweet
    at avro.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2234)
    at avro.shaded.com.google.common.cache.LocalCache.get(LocalCache.java:3965)
    at avro.shaded.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3969)
    at avro.shaded.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4829)
    at org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:225)
    ... 9 more
Caused by: org.apache.avro.AvroRuntimeException: Not a Specific class: class com.github.geoheil.streamingreference.Tweet
    at org.apache.avro.specific.SpecificData.createSchema(SpecificData.java:285)
    at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:218)
    at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:215)
    at avro.shaded.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3568)
    at avro.shaded.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2350)
    at avro.shaded.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2313)
    at avro.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2228)
    ... 13 more

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题