我试图用flink数据流从Kafka那里检索数据。当我启动代码时,我有以下日志:
11:57:16,891 INFO org.apache.flink.api.java.typeutils.TypeExtractor - class org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode does not contain a getter for field _children
11:57:16,892 INFO org.apache.flink.api.java.typeutils.TypeExtractor - class org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode does not contain a setter for field _children
11:57:16,892 INFO org.apache.flink.api.java.typeutils.TypeExtractor - Class class org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.
所以我没有错误,但是这个日志,但是当我试图打印数据流时,什么都没有出现。这是我制作Kafka主题并与flink一起使用的代码:
String topicName = "prova3";
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.1.22:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
String msg = new JSONObject()
.put("campo1", "Italy")
.put("campo2", "Technology")
.toString();
System.out.println(msg);
Producer<String, JsonNode> producer = new KafkaProducer<String, JsonNode>(props);
ObjectMapper objectMapper = new ObjectMapper();
JsonNode jsonNode = null;
System.out.println(msg);
jsonNode = objectMapper.readTree(msg);
System.out.println(jsonNode);
producer.send(new ProducerRecord<String, JsonNode>(topicName, jsonNode));
producer.close();
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "192.168.1.22:9092");
properties.setProperty("group.id", topicName);
DataStreamSource<ObjectNode> stream = env
.addSource(new FlinkKafkaConsumer<>("prova3", new JSONKeyValueDeserializationSchema(false), properties));
stream.print();
暂无答案!
目前还没有任何答案,快来回答吧!