据我所知,Kafka使用信息转换器来传递信息。
为了 Avro
它使用 AvroSchemaRegistryClientMessageConverter
它使用缓存进行优化。
我们正在使用 Hazelcast
作为缓存提供者,因此需要 Serializable
,我们知道为什么。。
现在,avro架构不是作为可序列化的生成的,当我尝试发送消息时,它失败了:
Caused by: com.hazelcast.nio.serialization.HazelcastSerializationException: Failed to serialize 'org.apache.avro.Schema$RecordSchema'
at com.hazelcast.internal.serialization.impl.SerializationUtil.handleSerializeException(SerializationUtil.java:82) ~[hazelcast-3.12.9.jar:3.12.9]
at com.hazelcast.internal.serialization.impl.AbstractSerializationService.toBytes(AbstractSerializationService.java:157) ~[hazelcast-3.12.9.jar:3.12.9]
at com.hazelcast.internal.serialization.impl.AbstractSerializationService.toBytes(AbstractSerializationService.java:133) ~[hazelcast-3.12.9.jar:3.12.9]
at com.hazelcast.internal.serialization.impl.AbstractSerializationService.toData(AbstractSerializationService.java:118) ~[hazelcast-3.12.9.jar:3.12.9]
at com.hazelcast.map.impl.proxy.MapProxySupport.toDataWithStrategy(MapProxySupport.java:1241) ~[hazelcast-3.12.9.jar:3.12.9]
at com.hazelcast.map.impl.proxy.MapProxySupport.getInternal(MapProxySupport.java:343) ~[hazelcast-3.12.9.jar:3.12.9]
at com.hazelcast.map.impl.proxy.MapProxyImpl.get(MapProxyImpl.java:120) ~[hazelcast-3.12.9.jar:3.12.9]
at com.hazelcast.spring.cache.HazelcastCache.lookup(HazelcastCache.java:162) ~[hazelcast-spring-3.12.9.jar:3.12.9]
at com.hazelcast.spring.cache.HazelcastCache.get(HazelcastCache.java:73) ~[hazelcast-spring-3.12.9.jar:3.12.9]
at org.springframework.cloud.stream.schema.avro.AvroSchemaRegistryClientMessageConverter.resolveSchemaForWriting(AvroSchemaRegistryClientMessageConverter.java:299) ~[spring-cloud-stream-schema-2.2.1.RELEASE.jar:2.2.1.RELEASE]
at org.springframework.cloud.stream.schema.avro.AbstractAvroMessageConverter.convertToInternal(AbstractAvroMessageConverter.java:125) ~[spring-cloud-stream-schema-2.2.1.RELEASE.jar:2.2.1.RELEASE]
at org.springframework.messaging.converter.AbstractMessageConverter.toMessage(AbstractMessageConverter.java:217) ~[spring-messaging-5.2.4.RELEASE.jar:5.2.4.RELEASE]
at org.springframework.messaging.converter.AbstractMessageConverter.toMessage(AbstractMessageConverter.java:207) ~[spring-messaging-5.2.4.RELEASE.jar:5.2.4.RELEASE]
at org.springframework.messaging.converter.CompositeMessageConverter.toMessage(CompositeMessageConverter.java:83) ~[spring-messaging-5.2.4.RELEASE.jar:5.2.4.RELEASE]
at org.springframework.cloud.stream.binding.MessageConverterConfigurer$OutboundContentTypeConvertingInterceptor.doPreSend(MessageConverterConfigurer.java:263) ~[spring-cloud-stream-3.0.3.RELEASE.jar:3.0.3.RELEASE]
at org.springframework.cloud.stream.binding.MessageConverterConfigurer$AbstractContentTypeInterceptor.preSend(MessageConverterConfigurer.java:295) ~[spring-cloud-stream-3.0.3.RELEASE.jar:3.0.3.RELEASE]
at org.springframework.integration.channel.AbstractMessageChannel$ChannelInterceptorList.preSend(AbstractMessageChannel.java:613) ~[spring-integration-core-5.2.4.RELEASE.jar:5.2.4.RELEASE]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:443) ~[spring-integration-core-5.2.4.RELEASE.jar:5.2.4.RELEASE]
... 158 common frames omitted
Caused by: com.hazelcast.nio.serialization.HazelcastSerializationException: There is no suitable serializer for class org.apache.avro.Schema$RecordSchema
at com.hazelcast.internal.serialization.impl.AbstractSerializationService.serializerFor(AbstractSerializationService.java:487) ~[hazelcast-3.12.9.jar:3.12.9]
at com.hazelcast.internal.serialization.impl.AbstractSerializationService.toBytes(AbstractSerializationService.java:146) ~[hazelcast-3.12.9.jar:3.12.9]
... 174 common frames omitted
根据我的发现,我试图用这种方式配置Kafka:
@Configuration
public class KafkaConfig {
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
return props;
}
@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
但它仍在失败。
显然,问题是如何克服这一点?也许我配置了错误的东西,但我真的不知道如何正确地做它。
编辑:当我向项目中添加第二个缓存管理器(cafferine)并将bean标记为@primary时,cachemanager字段设置为 CaffeineCacheManager
序列化也没有问题。但是我不想在项目中有两个缓存提供程序,我需要显式地为缓存方法上的hazelcast设置缓存管理器。。
请协助,
谢谢您!
暂无答案!
目前还没有任何答案,快来回答吧!