apache flink从kafka读取avro byte[]

koaltpgm  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(524)

在回顾这些例子时,我看到了很多:

FlinkKafkaConsumer08<Event> kafkaConsumer = new FlinkKafkaConsumer08<>("myavrotopic", avroSchema, properties);

我看到他们已经知道了模式。
我不知道模式,直到我把字节[]读入一个通用记录,然后得到模式(因为它可能会随着记录的变化而变化)
有人能给我指一张table吗 FlinkKafkaConsumer08 那是从 byte[] 到一个Map过滤器,这样我可以删除一些前导位,然后加载 byte[] 变成普通记录?

ccrfmcuu

ccrfmcuu1#

我也在做类似的事情(我用的是09消费者)
在自定义反序列化程序的主代码传递中:

FlinkKafkaConsumer09<Object> kafkaConsumer = new FlinkKafkaConsumer09<>(
                parameterTool.getRequired("topic"), new MyDeserializationSchema<>(),
                parameterTool.getProperties());

自定义反序列化模式读取字节,找出模式并/或从模式注册表检索它,反序列化为genericrecord并返回genericrecord对象。

public class MyDeserializationSchema<T> implements DeserializationSchema<T> {

    private final Class<T> avrotype = (Class<T>) org.apache.avro.generic.GenericRecord.class;

    @Override
    public T deserialize(byte[] arg0) throws IOException {
        //do your stuff here, strip off your bytes
        //deserialize and create your GenericRecord 
        return (T) (myavroevent);
    }

    @Override
    public boolean isEndOfStream(T nextElement) {
        return false;
    }

    @Override
    public TypeInformation<T> getProducedType() {
        return TypeExtractor.getForClass(avrotype);
    }

}
xxb16uws

xxb16uws2#

如果您使用confluent的schema registry,我相信首选的解决方案是使用confluent提供的avro serde。这样,我们就打电话 deserialize() 而要使用的最新版本avro模式的分辨率是在后台自动完成的,不需要字节操作。
它可以归结为以下内容(scala中的示例代码,java解决方案非常类似):

import io.confluent.kafka.serializers.KafkaAvroDeserializer

...

val valueDeserializer = new KafkaAvroDeserializer()
valueDeserializer.configure(
  Map(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG -> schemaRegistryUrl).asJava, 
  false)

...

override def deserialize(messageKey: Array[Byte], message: Array[Byte], 
                       topic: String, partition: Int, offset: Long): KafkaKV = {

    val key = keyDeserializer.deserialize(topic, messageKey).asInstanceOf[GenericRecord]
    val value = valueDeserializer.deserialize(topic, message).asInstanceOf[GenericRecord]

    KafkaKV(key, value)
    }

...

此方法要求消息生产者也与模式注册表集成,并在那里发布模式。这可以用与上面非常相似的方法来完成,使用confluent的 KafkaAvroSerializer 我在这里发布了一个详细的解释:如何将flink与confluent的模式注册表集成

相关问题