我试图用以下代码从Kafka主题中获取json:
public class FlinkMain {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// parse user parameters
ParameterTool parameterTool = ParameterTool.fromArgs(args);
DataStream messageStream = env.addSource(
new FlinkKafkaConsumer09<>(parameterTool.getRequired("topic")
, new JSONKeyValueDeserializationSchema(false), parameterTool.getProperties()));
messageStream.map(new MapFunction<String, String>() {
private static final long serialVersionUID = -6867736771747690202L;
@Override
public String map(String value) throws Exception {
return "Kafka and Flink says: " + value;
}
});
env.execute();
}
}
问题是:
1) 此程序由于
Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: The return type of function 'main(FlinkMain.java:23)' could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface.
The problem is at line: `messageStream.map(....`
2) 也许上述问题与 DataStream
没有类型。但如果我想: DataStream<String> messageStream = env.addSource(...
由于 cannot resolve constructor FlinkKafkaConsumer09 ...
这个 pom.xml
(重要部分):
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.9_2.11</artifactId>
<version>1.1.1</version>
</dependency>
</dependencies>
我一直在flink中寻找一些使用json反序列化模式但没有成功的代码。我刚刚找到了 JSONKeyValueDeserializationSchema
在这个环节
有人知道如何做正确的事吗?
谢谢
2条答案
按热度按时间92dk7w1h1#
我遵循了vishnu-viswanath的回答,但是jsonkeyvaluedeserializationschema在json解析器步骤中引发了一个异常,即使是对于一个简单的json
{"name":"John Doe"}
.抛出的代码是:
输出:
我成功地使用了另一个反序列化模式jsondeserializationschema
bwitn5fc2#
你的错误已经到了底线
messageStream.map(new MapFunction<String, String>()
. 您定义的mapfunction需要类型字符串的输入和类型字符串的输出,但是由于您使用的是JSONKeyValueDeserializationSchema
将字符串转换为com.fasterxml.jackson.databind.node.ObjectNode
mapfunction实际上应该期望输入相同类型的objectnode。请尝试以下代码。