我想发送我的帐户类与生产者到我的Kafka主题,然后我将与Kafka流聚合。但是,我无法发送收到的对象错误:
原因:org.apache.kafka.common.kafkaexception:bank.account不是org.apache.kafka.common.serialization.serializer的示例
我的制作人课程:
public static void main(String[] args) {
DataAccess dataAccess = new DataAccess();
List<Account> accountList = dataAccess.read();
final Logger logger = LoggerFactory.getLogger(Producer.class);
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,LongSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,Account.class.getName());
KafkaProducer<Long,Account> producer = new KafkaProducer<Long, Account>(properties);
for (Account account : accountList) {
ProducerRecord<Long,Account> record = new ProducerRecord<Long, Account>("bank_account",account.getFromId(),account);
producer.send(record, new Callback() {
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e == null) {
logger.info("Record sent successfully. \n "+ "Topic : "+recordMetadata.topic() +"\n"+
"Partition : " + recordMetadata.partition() + "\n"+
"Offset : " +recordMetadata.offset() +"\n"+
"Timestamp: " +recordMetadata.timestamp() +"\n");
try {
Thread.sleep(1000);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}
else{
logger.info("Error sending producer");
}
}
});
}
producer.flush();
producer.close();
}
它给出了这行的错误:
KafkaProducer<Long,Account> producer = new KafkaProducer<Long, Account>(properties);
我的帐户类别:
public class Account {
private long fromId;
private long amount;
private long toId;
private ZonedDateTime time;
}
所以我的问题是,我们如何才能发送自定义对象到Kafka主题?在那之后,我当然想用这个信息。
2条答案
按热度按时间s4n0splo1#
pokxtpni2#
这条线
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,Account.class.getName());
你必须实现你自己的目标Serializer
班级。它不能是一个普通的类。有些人使用json进行序列化,有些人使用avro或protobuf。但是你要把数据放到
byte[]
只是一个实现细节。javakafka对象序列化程序和反序列化程序