从kafka请求java对象时出错

6ioyuze2  于 2021-06-04  发布在  Kafka
关注(0)|答案(2)|浏览(271)

我开始学习Kafka,现在,我正在发送/接收序列化/理想化的java类。我的问题是:我在配置中遗漏了什么,所以我不能从kafka反序列化对象
这是我的课:

public class Foo { 
    private String item;
    private int quantity;
    private Double price;

    public Foo(String item, int quantity, final double price) {
        this.item = item;
        this.quantity = quantity;
        this.price = price;
    }

    public String getItem() { return item; }
    public int getQuantity() { return quantity; }
    public Double getPrice() { return price; }

    public void setQuantity(int quantity) { this.quantity = quantity; }
    public void setPrice(double price) { this.price = price; }

    @Override
    public String toString() {
        return "item=" + item + ", quantity=" + quantity + ", price=" + price;
    }
}

我在主类中的属性:
producerpropsobject.put(producerconfig.client\u id\u config,appconfigs.applicationproducerserializedobject);producerpropsobject.put(producerconfig.bootstrap\u servers\u config,appconfigs.bootstrapservers);producerpropsobject.put(producerconfig.key\u serializer\u class\u config,stringserializer.class.getname());producerpropsobject.put(producerconfig.value\u serializer\u class\u config,fooserializer.class.getname());producerpropsobject.put(“主题”,appconfigs.topicnameforserializedobject);
consumerpropsobject.put(consumerconfig.group\u id\u config,appconfigs.applicationproducerserializedobject);consumerpropsobject.put(consumerconfig.bootstrap\u servers\u config,appconfigs.bootstrapservers);consumerpropsobject.put(consumerconfig.key_deserializer_class_config,stringdeserializer.class.getname());consumerpropsobject.put(consumerconfig.value_deserializer_class_config,foodeserializer.class.getname());consumerpropsobject.put(consumerconfig.max\u poll\u interval\u ms\u config,300000);consumerpropsobject.put(consumerconfig.enable\u auto\u commit\u config,true);consumerpropsobject.put(consumerconfig.auto_offset_reset_config,“最早”);consumerpropsobject.put(“主题”,appconfigs.topicnameforserializedobject);
以下是序列化程序/反序列化程序实现:

public class FooSerializer implements org.apache.kafka.common.serialization.Serializer {

    public void configure(Map map, boolean b) { }

    public byte[] serialize(String s, Object o) {
        try {
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            ObjectOutputStream oos = new ObjectOutputStream(baos);
            oos.writeObject(o);
            oos.close();
            byte[] b = baos.toByteArray();
            return b;
        } catch (IOException e) { return new byte[0]; }
    }

    public void close() {  }
}

public class FooDeserializer implements org.apache.kafka.common.serialization.Deserializer {

    @Override
    public void close() { }

    @Override
    public Foo deserialize(String arg0, byte[] arg1) {

        //Option #1:
        //ObjectMapper mapper = new ObjectMapper();
        //Option #2:
        JsonFactory factory = new JsonFactory();
        factory.enable(JsonParser.Feature.ALLOW_SINGLE_QUOTES);
        ObjectMapper mapper = new ObjectMapper(factory);
        Foo fooObj = null;
        try {
            //Option #1:
            //fooObj = mapper.readValue(arg1, Foo.class);     // BREAKS HERE!!!
            //Option #2:
            fooObj = mapper.reader().forType(Foo.class).readValue(arg1); // BREAKS HERE!!!

        }
        catch (Exception e) { e.printStackTrace(); }

        return fooObj;
    }

}

最后,我试着从main生产和消费我的食物:
似乎,它工作得很好,因为我看到Kafka主题我的关键和价值以后

public void produceObjectToKafka(final Properties producerProps) { 
    final String[] ar = new String[]{"Matrix", "Naked Gun", "5th Element", "Die Hard", "Gone with a wind"};
    KafkaProducer<String, byte[]> producer = new KafkaProducer<>(producerProps);
    final Foo j = new Foo(ar[getAnInt(4)], getAnInt(10), getAnDouble());
    producer.send(new ProducerRecord<>(producerProps.getProperty("topic"), j.getItem(), j.toString().getBytes()));
    producer.flush();   
    producer.close();
}

然而,当我的消费者正在捕捉输出时:

public void consumeFooFromKafka(final Properties consumerProps) {
    final Consumer<String, Foo> myConsumer = new KafkaConsumer<>(consumerProps);
    final Thread separateThread = new Thread(() -> {
        try {
            myConsumer.subscribe(Collections.singletonList(consumerProps.getProperty("topic")));
            while (continueToRunFlag) {
                final StringBuilder sb = new StringBuilder();
                final ConsumerRecords<String, Foo> consumerRecords = myConsumer.poll(Duration.ofMillis(10));
                if (consumerRecords.count() > 0) {
                    for (ConsumerRecord<String, Foo> cRec : consumerRecords) {
                        sb.append(  cRec.key()  ).append("<<").append(cRec.value().getItem() + ",").append(cRec.value().getQuantity() + ",").append(cRec.value().getPrice()).append("|");
                   }
               }
               if (sb.length() > 0) { System.out.println(sb.toString()); }
            }
        }
        finally {
            myConsumer.close();
        }
    });
    separateThread.start();
}

============================================所以,实际上是通过运行“consumefoofromkafka”,当它触发“foodeserializer”。。。。。。在那里,我总是有相同的错误(不管选项1或选项2):
异常:方法引发了“com.fasterxml.jackson.core.jsonparseexception”异常。detailedmessage:意外的字符('¬' (代码172):应为有效值(json字符串、数字、数组、对象或标记“null”、“true”或“false”)
非常感谢您的帮助。。。。。。。提前谢谢你,史蒂夫

iecba09b

iecba09b1#

如果您想从json反序列化,那么您需要将其序列化为json,也可以在您的序列化程序中使用jackson,一切都应该正常

public class FooSerializer implements org.apache.kafka.common.serialization.Serializer {

    public void configure(Map map, boolean b) { }

    public byte[] serialize(String s, Object o) {
        try {
            ObjectMapper om = new ObjectMapper();//objectmapper from jackson
            byte[] b = om.writeValueAsString(o).getBytes();
            return b;

        } catch (IOException e) { return new byte[0]; }
    }

    public void close() {  }
}
pqwbnv8z

pqwbnv8z2#

我不知道为什么要使用bytearray outputstream,但尝试在反序列化程序中读取json,但这解释了错误。您甚至可以通过直接调用serialize/deserialize方法来测试它,而不必使用kafka
在提供的链接中,序列化程序使用 objectMapper.writeValueAsString ,它返回json文本,而不是特定于java的outputstream。如果您想在不同的编程语言之间使用和生成数据(大多数公司都是这样),那么您应该避免使用这种特定的序列化格式
注意:confluent为kafka提供了avro、protobuf和json序列化程序,因此如果您想使用其中一种格式,就不需要编写自己的序列化程序

相关问题