带有kafka消息传递和hazelcast作为持久性提供者的ApacheAvro

x759pob2  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(202)

据我所知,Kafka使用信息转换器来传递信息。
为了 Avro 它使用 AvroSchemaRegistryClientMessageConverter 它使用缓存进行优化。
我们正在使用 Hazelcast 作为缓存提供者,因此需要 Serializable ,我们知道为什么。。

现在,avro架构不是作为可序列化的生成的,当我尝试发送消息时,它失败了:

Caused by: com.hazelcast.nio.serialization.HazelcastSerializationException: Failed to serialize 'org.apache.avro.Schema$RecordSchema'
    at com.hazelcast.internal.serialization.impl.SerializationUtil.handleSerializeException(SerializationUtil.java:82) ~[hazelcast-3.12.9.jar:3.12.9]
    at com.hazelcast.internal.serialization.impl.AbstractSerializationService.toBytes(AbstractSerializationService.java:157) ~[hazelcast-3.12.9.jar:3.12.9]
    at com.hazelcast.internal.serialization.impl.AbstractSerializationService.toBytes(AbstractSerializationService.java:133) ~[hazelcast-3.12.9.jar:3.12.9]
    at com.hazelcast.internal.serialization.impl.AbstractSerializationService.toData(AbstractSerializationService.java:118) ~[hazelcast-3.12.9.jar:3.12.9]
    at com.hazelcast.map.impl.proxy.MapProxySupport.toDataWithStrategy(MapProxySupport.java:1241) ~[hazelcast-3.12.9.jar:3.12.9]
    at com.hazelcast.map.impl.proxy.MapProxySupport.getInternal(MapProxySupport.java:343) ~[hazelcast-3.12.9.jar:3.12.9]
    at com.hazelcast.map.impl.proxy.MapProxyImpl.get(MapProxyImpl.java:120) ~[hazelcast-3.12.9.jar:3.12.9]
    at com.hazelcast.spring.cache.HazelcastCache.lookup(HazelcastCache.java:162) ~[hazelcast-spring-3.12.9.jar:3.12.9]
    at com.hazelcast.spring.cache.HazelcastCache.get(HazelcastCache.java:73) ~[hazelcast-spring-3.12.9.jar:3.12.9]
    at org.springframework.cloud.stream.schema.avro.AvroSchemaRegistryClientMessageConverter.resolveSchemaForWriting(AvroSchemaRegistryClientMessageConverter.java:299) ~[spring-cloud-stream-schema-2.2.1.RELEASE.jar:2.2.1.RELEASE]
    at org.springframework.cloud.stream.schema.avro.AbstractAvroMessageConverter.convertToInternal(AbstractAvroMessageConverter.java:125) ~[spring-cloud-stream-schema-2.2.1.RELEASE.jar:2.2.1.RELEASE]
    at org.springframework.messaging.converter.AbstractMessageConverter.toMessage(AbstractMessageConverter.java:217) ~[spring-messaging-5.2.4.RELEASE.jar:5.2.4.RELEASE]
    at org.springframework.messaging.converter.AbstractMessageConverter.toMessage(AbstractMessageConverter.java:207) ~[spring-messaging-5.2.4.RELEASE.jar:5.2.4.RELEASE]
    at org.springframework.messaging.converter.CompositeMessageConverter.toMessage(CompositeMessageConverter.java:83) ~[spring-messaging-5.2.4.RELEASE.jar:5.2.4.RELEASE]
    at org.springframework.cloud.stream.binding.MessageConverterConfigurer$OutboundContentTypeConvertingInterceptor.doPreSend(MessageConverterConfigurer.java:263) ~[spring-cloud-stream-3.0.3.RELEASE.jar:3.0.3.RELEASE]
    at org.springframework.cloud.stream.binding.MessageConverterConfigurer$AbstractContentTypeInterceptor.preSend(MessageConverterConfigurer.java:295) ~[spring-cloud-stream-3.0.3.RELEASE.jar:3.0.3.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel$ChannelInterceptorList.preSend(AbstractMessageChannel.java:613) ~[spring-integration-core-5.2.4.RELEASE.jar:5.2.4.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:443) ~[spring-integration-core-5.2.4.RELEASE.jar:5.2.4.RELEASE]
    ... 158 common frames omitted
Caused by: com.hazelcast.nio.serialization.HazelcastSerializationException: There is no suitable serializer for class org.apache.avro.Schema$RecordSchema
    at com.hazelcast.internal.serialization.impl.AbstractSerializationService.serializerFor(AbstractSerializationService.java:487) ~[hazelcast-3.12.9.jar:3.12.9]
    at com.hazelcast.internal.serialization.impl.AbstractSerializationService.toBytes(AbstractSerializationService.java:146) ~[hazelcast-3.12.9.jar:3.12.9]
    ... 174 common frames omitted

根据我的发现,我试图用这种方式配置Kafka:

@Configuration
public class KafkaConfig {
    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        return props;
    }

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

但它仍在失败。
显然,问题是如何克服这一点?也许我配置了错误的东西,但我真的不知道如何正确地做它。
编辑:当我向项目中添加第二个缓存管理器(cafferine)并将bean标记为@primary时,cachemanager字段设置为 CaffeineCacheManager 序列化也没有问题。但是我不想在项目中有两个缓存提供程序,我需要显式地为缓存方法上的hazelcast设置缓存管理器。。
请协助,
谢谢您!

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题