使用flink1.9.1下的合流注册表序列化kafka消息

alen0pnh  于 2021-06-26  发布在  Flink
关注(0)|答案(1)|浏览(340)

有没有可能向Kafka发布用 KafkaAvroSerializer 通过汇合。我使用的是flink1.9.1,我已经看到一些新版本的flinkavro(1.11.0)正在进行开发,但是我坚持使用这个版本。
我想用新引进的 KafkaSerializationSchema 用于将消息序列化到汇合模式注册表和kakfa。
这里我有一个类正在转换一个类类型 T 但是我想使用合流序列化。

public class KafkaMessageSerialization<T extends SpecificRecordBase> implements KafkaSerializationSchema<T> {
    public static final Logger LOG = LoggerFactory.getLogger(KafkaMessageSerialization.class);

    final private String topic;

    public KafkaMessageSerialization(String topic) {
        this.topic = topic;
    }

    @Override
    public ProducerRecord<byte[], byte[]> serialize(T event, Long timestamp) {
        final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
        final Schema schema = event.getSchema();
        final DatumWriter<T> writer = new ReflectDatumWriter<>(schema);
        final BinaryEncoder binEncoder = EncoderFactory.get().binaryEncoder(outputStream, null);

        try {
            writer.write(event, binEncoder);
            binEncoder.flush();
        } catch (final Exception e) {
            LOG.error("serialization error", e);
            throw new RuntimeException(e);
        }

        return new ProducerRecord<>(topic, outputStream.toByteArray());
    }
}

使用比较方便 .addSink(new FlinkKafkaProducer<>(SINK_TOPIC, new KafkaMessageSerialization<>(SINK_TOPIC), producerProps, Semantic.AT_LEAST_ONCE))

slmsl1lt

slmsl1lt1#

我在同样的情况下,基于你的解决方案,我写了这个类。我已经用flink1.10.1测试过了。

public class ConfluentAvroMessageSerialization<T extends SpecificRecordBase> implements KafkaSerializationSchema<T> {

    public static final org.slf4j.Logger LOG = LoggerFactory.getLogger(ConfluentAvroMessageSerialization.class);

    final private String topic;
    final private int schemaId;
    final private int magicByte;

    public ConfluentAvroMessageSerialization(String topic, String schemaRegistryUrl) throws IOException, RestClientException {
        magicByte = 0;
        this.topic = topic;

        SchemaRegistryClient schemaRegistry = new CachedSchemaRegistryClient(schemaRegistryUrl, 1000);
        SchemaMetadata schemaMetadata = schemaRegistry.getLatestSchemaMetadata(topic + "-value");
        schemaId = schemaMetadata.getId();

        LOG.info("Confluent Schema ID {} for topic {} found", schemaId, topic);
    }

    @Override
    public ProducerRecord<byte[], byte[]> serialize(T event, Long timestamp) {
        final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
        final Schema schema = event.getSchema();
        final DatumWriter<T> writer = new ReflectDatumWriter<>(schema);
        final BinaryEncoder binEncoder = EncoderFactory.get().binaryEncoder(outputStream, null);

        try {
            byte[] schemaIdBytes = ByteBuffer.allocate(4).putInt(schemaId).array();
            outputStream.write(magicByte); // Confluent Magic Byte
            outputStream.write(schemaIdBytes); // Confluent Schema ID (4 Byte Format)
            writer.write(event, binEncoder); // Avro data
            binEncoder.flush();
        } catch (final Exception e) {
            LOG.error("Schema Registry Serialization Error", e);
            throw new RuntimeException(e);
        }

        return new ProducerRecord<>(topic, outputStream.toByteArray());
    }
}

confluent有一个属性格式,其中包含一个魔术字节和模式id(4字节)。更多信息请查看https://docs.confluent.io/current/schema-registry/serdes-develop/index.html#wire-格式

相关问题