使用flink从kafka传输json的java问题

bvn4nwqk  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(850)

我试图用flink数据流从Kafka那里检索数据。当我启动代码时,我有以下日志:

11:57:16,891 INFO  org.apache.flink.api.java.typeutils.TypeExtractor             - class org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode does not contain a getter for field _children

    11:57:16,892 INFO  org.apache.flink.api.java.typeutils.TypeExtractor             - class org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode does not contain a setter for field _children

    11:57:16,892 INFO  org.apache.flink.api.java.typeutils.TypeExtractor             - Class class org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.

所以我没有错误,但是这个日志,但是当我试图打印数据流时,什么都没有出现。这是我制作Kafka主题并与flink一起使用的代码:

String topicName = "prova3";

    Properties props = new Properties();

    props.put("bootstrap.servers", "192.168.1.22:9092");

    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

    String msg = new JSONObject()
            .put("campo1", "Italy")
            .put("campo2", "Technology")
            .toString();

    System.out.println(msg);

    Producer<String, JsonNode> producer = new KafkaProducer<String, JsonNode>(props);

    ObjectMapper objectMapper = new ObjectMapper();

    JsonNode  jsonNode = null;

    System.out.println(msg);

    jsonNode = objectMapper.readTree(msg);

    System.out.println(jsonNode);

    producer.send(new ProducerRecord<String, JsonNode>(topicName, jsonNode));

    producer.close();

    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    Properties properties = new Properties();
    properties.setProperty("bootstrap.servers", "192.168.1.22:9092");
    properties.setProperty("group.id", topicName);
    DataStreamSource<ObjectNode> stream = env
            .addSource(new FlinkKafkaConsumer<>("prova3", new JSONKeyValueDeserializationSchema(false), properties));

    stream.print();

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题