ApacheFlink—是否可以反序列化avro消息(使用来自kafka的消息),而不在confluentregistryavrodeserializationschema中提供读取器模式

dauxcl2d  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(412)

我在apache flink中使用kafka连接器来访问由合流kafka服务的流。
除了架构注册表url ConfluentRegistryAvroDeserializationSchema.forGeneric(...) 应为“reader”架构。我不想提供read模式,而是想使用同一个writer的模式(在注册表中查找)来读取消息,因为使用者不会有最新的模式。

FlinkKafkaConsumer010<GenericRecord> myConsumer =
        new FlinkKafkaConsumer010<>("topic-name", ConfluentRegistryAvroDeserializationSchema.forGeneric(<reader schema goes here>, "http://host:port"), properties);
myConsumer.setStartFromLatest();

https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/connectors/kafka.html “使用这些反序列化架构记录将与从架构注册表检索并转换为静态提供的架构一起读取”
既然我不想在用户端保留模式定义,那么如何使用writer的模式反序列化来自kafka的avro消息呢?
谢谢你的帮助!

slmsl1lt

slmsl1lt1#

我认为不可能直接使用 ConfluentRegistryAvroDeserializationSchema.forGeneric . 它打算与读卡器模式一起使用,并且它们具有检查这一点的前提条件。
你必须实现你自己的。两件重要的事情:
specific.avro.reader 为false(否则会得到特定的记录)
这个 KafkaAvroDeserializer 必须延迟初始化(因为它本身不可序列化,因为它持有对schema registry客户机的引用)

import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
import java.util.HashMap;
import java.util.Map;
import org.apache.avro.generic.GenericRecord;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;

public class KafkaGenericAvroDeserializationSchema
    implements KeyedDeserializationSchema<GenericRecord> {

  private final String registryUrl;
  private transient KafkaAvroDeserializer inner;

  public KafkaGenericAvroDeserializationSchema(String registryUrl) {
    this.registryUrl = registryUrl;
  }

  @Override
  public GenericRecord deserialize(
      byte[] messageKey, byte[] message, String topic, int partition, long offset) {
    checkInitialized();
    return (GenericRecord) inner.deserialize(topic, message);
  }

  @Override
  public boolean isEndOfStream(GenericRecord nextElement) {
    return false;
  }

  @Override
  public TypeInformation<GenericRecord> getProducedType() {
    return TypeExtractor.getForClass(GenericRecord.class);
  }

  private void checkInitialized() {
    if (inner == null) {
      Map<String, Object> props = new HashMap<>();
      props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, registryUrl);
      props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, false);
      SchemaRegistryClient client =
          new CachedSchemaRegistryClient(
              registryUrl, AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT);
      inner = new KafkaAvroDeserializer(client, props);
    }
  }
}
env.addSource(
  new FlinkKafkaConsumer<>(
    topic, 
    new KafkaGenericAvroDeserializationSchema(schemaReigstryUrl), 
    kafkaProperties));

相关问题