kafka中包含反序列化的类型元数据

oprakyz7  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(430)

我在《Spring的Kafka》的听众那里做反序列化。但这假设类型信息是由SpringKafka制作人包含或发送的。在我的例子中,json是由debezium mysqlconnector发送的,它没有添加这个元数据。所以我想把它添加到请求中。我知道它放在jsonserializer的请求中的某个地方,我查看了源代码,但无法确切地理解如何在序列化过程中将元数据类型一般地添加到请求中。尤其是哪个字段保存此类型信息?是序列化的java对象的类名吗?我认为仅仅设置一个默认的序列化程序是行不通的,因为我有多个消费者在听不同的主题。除了最简单的情况外,设置一个默认值是行不通的,因为我有许多消费者和类型,我正在反序列化。所以这个答案在我的案例kafka中是行不通的-在consumer中反序列化对象
update尝试在反序列化程序上使用方法类型,但存在另一个问题:kafka-spring反序列化程序从未调用returntype静态方法

xfyts7mz

xfyts7mz1#

看见

public abstract class AbstractJavaTypeMapper implements BeanClassLoaderAware {

    /**
     * Default header name for type information.
     */
    public static final String DEFAULT_CLASSID_FIELD_NAME = "__TypeId__";

    /**
     * Default header name for container object contents type information.
     */
    public static final String DEFAULT_CONTENT_CLASSID_FIELD_NAME = "__ContentTypeId__";

    /**
     * Default header name for map key type information.
     */
    public static final String DEFAULT_KEY_CLASSID_FIELD_NAME = "__KeyTypeId__";

    /**
     * Default header name for key type information.
     */
    public static final String KEY_DEFAULT_CLASSID_FIELD_NAME = "__Key_TypeId__";

    /**
     * Default header name for key container object contents type information.
     */
    public static final String KEY_DEFAULT_CONTENT_CLASSID_FIELD_NAME = "__Key_ContentTypeId__";

    /**
     * Default header name for key map key type information.
     */
    public static final String KEY_DEFAULT_KEY_CLASSID_FIELD_NAME = "__Key_KeyTypeId__";

2组标题(键和值)。 TypeId 是简单的类
如果 TypeId 是一个容器
List<?> ContentTypeId 是包含的类型。
如果 TypeId 是一个
Map Key_TypeId 是键类型。
这允许您重建 Map<Foo, Bar> .
这些头可以包含完全限定的类名,也可以包含通过 classIdMappings Map。
但是,由于版本2.5,使用新的
使用方法确定类型。
这样,您就可以设置自己的头并在方法中检查它们。
编辑
下面是一个简单的例子:

@SpringBootApplication
public class Gitter76Application {

    public static void main(String[] args) {
        SpringApplication.run(Gitter76Application.class, args);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("gitter76").partitions(1).replicas(1).build();
    }

    @KafkaListener(id = "gitter76", topics = "gitter76")
    public void listen(Foo in) {
        System.out.println(in);
    }

}
public class Foo {

    private String bar;

    public Foo() {
    }

    public Foo(String bar) {
        this.bar = bar;
    }

    public String getBar() {
        return this.bar;
    }

    public void setBar(String bar) {
        this.bar = bar;
    }

    @Override
    public String toString() {
        return "Foo [bar=" + this.bar + "]";
    }

}
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer

spring.kafka.consumer.properties.spring.json.trusted.packages=com.example.demo
$ kafkacat -P -b localhost:9092 -t gitter76 -H __TypeId__=com.example.demo.Foo
{"bar":"baz"}
^C
2020-08-08 09:32:10.034  INFO 58146 --- [ gitter76-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : gitter76: partitions assigned: [gitter76-0]
Foo [bar=baz]

相关问题