如何使用producer将自定义对象发送到kafka主题

ua4mk5z4  于 2021-06-06  发布在  Kafka
关注(0)|答案(2)|浏览(656)

我想发送我的帐户类与生产者到我的Kafka主题,然后我将与Kafka流聚合。但是,我无法发送收到的对象错误:
原因:org.apache.kafka.common.kafkaexception:bank.account不是org.apache.kafka.common.serialization.serializer的示例
我的制作人课程:

public static void main(String[] args) {

        DataAccess dataAccess = new DataAccess();
        List<Account> accountList = dataAccess.read();

        final Logger logger = LoggerFactory.getLogger(Producer.class);
        Properties properties = new Properties();

        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");
        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,LongSerializer.class.getName());
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,Account.class.getName());

        KafkaProducer<Long,Account> producer = new KafkaProducer<Long, Account>(properties);

        for (Account account : accountList) {

            ProducerRecord<Long,Account> record = new ProducerRecord<Long, Account>("bank_account",account.getFromId(),account);

            producer.send(record, new Callback() {
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if (e == null) {
                        logger.info("Record sent successfully. \n "+ "Topic : "+recordMetadata.topic() +"\n"+
                                "Partition : " + recordMetadata.partition() + "\n"+
                                "Offset : " +recordMetadata.offset() +"\n"+
                                "Timestamp: " +recordMetadata.timestamp() +"\n");
                        try {
                            Thread.sleep(1000);
                        } catch (InterruptedException e1) {
                            e1.printStackTrace();
                        }

                    }
                    else{
                        logger.info("Error sending producer");
                    }
                }
            });
        }

        producer.flush();
        producer.close();
    }

它给出了这行的错误:

KafkaProducer<Long,Account> producer = new KafkaProducer<Long, Account>(properties);

我的帐户类别:

public class Account {

    private long fromId;
    private long amount;
    private long toId;
    private ZonedDateTime time;
}

所以我的问题是,我们如何才能发送自定义对象到Kafka主题?在那之后,我当然想用这个信息。

s4n0splo

s4n0splo1#

//1 
prop.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        prop.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, Employee.class.getName());

//2
 KafkaProducer<String, Employee> producer = new KafkaProducer(prop);

        Employee emp = new Employee(1, "Arun");

        ProducerRecord prodRecord = new ProducerRecord("aryan_topic", emp);
//3
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Serializer;

import java.io.Serializable;
import java.util.Map;

//Developed by Arun Singh
public class Employee implements Serializable, Serializer {
    Integer empId;
    String empName;
    Address add;

    public Employee() {
    }

    public Employee(Integer empId, String empName, Address add) {
        this.empId = empId;
        this.empName = empName;
        this.add = add;
    }

    public Integer getEmpId() {
        return empId;
    }

    public String getEmpName() {
        return empName;
    }

    public Address getAdd() {
        return add;
    }

    public void setEmpId(Integer empId) {
        this.empId = empId;
    }

    public void setEmpName(String empName) {
        this.empName = empName;
    }

    public void setAdd(Address add) {
        this.add = add;
    }

    public void configure(Map configs, boolean isKey) {

    }

    public byte[] serialize(String s, Object o) {
        return new byte[0];
    }

    public byte[] serialize(String topic, Headers headers, Object data) {
        return new byte[0];
    }

    public void close() {

    }
}
pokxtpni

pokxtpni2#

这条线 properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,Account.class.getName()); 你必须实现你自己的目标 Serializer 班级。它不能是一个普通的类。
有些人使用json进行序列化,有些人使用avro或protobuf。但是你要把数据放到 byte[] 只是一个实现细节。
javakafka对象序列化程序和反序列化程序

相关问题