有没有可能向Kafka发布用 KafkaAvroSerializer
通过汇合。我使用的是flink1.9.1,我已经看到一些新版本的flinkavro(1.11.0)正在进行开发,但是我坚持使用这个版本。
我想用新引进的 KafkaSerializationSchema
用于将消息序列化到汇合模式注册表和kakfa。
这里我有一个类正在转换一个类类型 T
但是我想使用合流序列化。
public class KafkaMessageSerialization<T extends SpecificRecordBase> implements KafkaSerializationSchema<T> {
public static final Logger LOG = LoggerFactory.getLogger(KafkaMessageSerialization.class);
final private String topic;
public KafkaMessageSerialization(String topic) {
this.topic = topic;
}
@Override
public ProducerRecord<byte[], byte[]> serialize(T event, Long timestamp) {
final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
final Schema schema = event.getSchema();
final DatumWriter<T> writer = new ReflectDatumWriter<>(schema);
final BinaryEncoder binEncoder = EncoderFactory.get().binaryEncoder(outputStream, null);
try {
writer.write(event, binEncoder);
binEncoder.flush();
} catch (final Exception e) {
LOG.error("serialization error", e);
throw new RuntimeException(e);
}
return new ProducerRecord<>(topic, outputStream.toByteArray());
}
}
使用比较方便 .addSink(new FlinkKafkaProducer<>(SINK_TOPIC, new KafkaMessageSerialization<>(SINK_TOPIC), producerProps, Semantic.AT_LEAST_ONCE))
1条答案
按热度按时间slmsl1lt1#
我在同样的情况下,基于你的解决方案,我写了这个类。我已经用flink1.10.1测试过了。
confluent有一个属性格式,其中包含一个魔术字节和模式id(4字节)。更多信息请查看https://docs.confluent.io/current/schema-registry/serdes-develop/index.html#wire-格式