Spring BootKafka Camel avro消费者

hivapdat  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(336)

我一直在努力寻找样本代码 Spring 启动Kafka Camel avro消费者没有运气。我在以下url找到了spring camel kafka消费者和生产者示例:
https://thysmichels.com/2015/09/04/apache-camel-kafka-spring-integration/
但缺少的是avro部分。我在这里查看avro的camel文档:
http://camel.apache.org/avro.html
我的具体问题是,一旦我的bean是从avro模式创建的,并且我有了pojo类,我该如何告诉camel(上面的spring示例)用户avro序列化?具体来说,我指的是这行代码:from(“kafka:localhost:9092?topic=test&zookeeperhost=localhost&zookeeperport=2181&groupid=group1&serializerclass=kafka.serializer.stringencoder“).bean(kafkaoutputbean.class);
其中序列化程序是stringencoder。如何告诉camel使用avro序列化?

brc7rcf0

brc7rcf01#

我找到了自己的答案。所以我想和大家分享一下。实际上是的 serializerClass=org.springframework.integration.kafka.serializer.avro.AvroSerializer . 代码非常简单,你几乎可以自己写。

public class AvroSerializer<T> {

        public T deserialize(final byte[] bytes, final DatumReader<T> reader) throws IOException {
            final Decoder decoder = DecoderFactory.get().binaryDecoder(bytes, null);
            return reader.read(null, decoder);
        }

        public byte[] serialize(final T input, final DatumWriter<T> writer) throws IOException {
            final ByteArrayOutputStream stream = new ByteArrayOutputStream();

            final Encoder encoder = EncoderFactory.get().binaryEncoder(stream, null);
            writer.write(input, encoder);
            encoder.flush();

            return stream.toByteArray();
        }
    }

相关问题