我是Kafka的新人。我想在我的spring项目中使用kafka生产者/消费者来替换activemq(jms)。我需要的是一个kafka生产者将我的消息对象发布到一个主题,然后一个消费者从该主题订阅它。
首先是我的自定义编码器,解码器也是如此(对于我的消息类configurationactionmsg):
@Component
public class ConfigActionMessageEncoder implements Encoder<ConfigurationActionMsg> {
public ConfigActionMessageEncoder() {
/* This constructor must be present for successful compile. */
}
public ConfigActionMessageEncoder(VerifiableProperties verifiableProperties) {
/* This constructor must be present for successful compile. */
}
@Override
public byte[] toBytes(ConfigurationActionMsg actionMsg){
return SerializationUtils.serialize(actionMsg);
}}
下面是我对处理器和使用者的配置
@Configuration
@ComponentScan(basePackages = {"XXX"})
public class KafkaConfig {
@Bean
public KafkaProducer<String,ConfigurationActionMsg> kafkaProducer(){
Properties props = new Properties();
props.put("zk.connect", "127.0.0.1:2181");
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "com.atlas.configengine2.XXX.ConfigActionMessageEncoder");
return new KafkaProducer<>(props);
}
@Bean
public KafkaConsumer<String, ConfigurationActionMsg> kafkaConsumer(){
Properties props = new Properties();
props.put("zk.connect", "127.0.0.1:2181");
props.put("bootstrap.servers", "localhost:9092");
//We should only have one process running for consumer
props.put("group.id", "resolverActionTrigger");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "com.atlas.configengine2.XXX.ConfigActionMessageDecoder");
KafkaConsumer<String, ConfigurationActionMsg> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("configAction"));
return consumer;
}}
我不确定这是否是示例化生产者/消费者的正确方法。但这种方法行不通。因为我的kafkaproducer无法示例化。
一些调试信息:
原因:org.apache.kafka.common.kafkae异常:com.atlas.configengine2.jms.configactionmessageencoder不是org.apache.kafka.common.serialization.serializer的示例
但我不确定这是否是唯一的问题?那么如何编写自定义编码器呢?
3条答案
按热度按时间mbzjlibv1#
导入org.apache.kafka.common.serialization.serializer;
导入java.io.serializable;导入java.util.map;
//由arun singh开发
oipij1gg2#
你应该实施
org.apache.kafka.common.serialization.Serializer
不是Encoder
.请参见customserializer示例。
xienkqul3#
首先,应该使用
org.apache.kafka.common.serialization.Serializer
.第二,我认为您可以发送对象的字符串表示,例如json,并跟上
StringSerializer
在producer中,以避免实现自定义org.apache.kafka.common.serialization.Serializer
.