我使用flink来处理来自某些数据源(如kafka、pravega等)的数据。
在我的例子中,数据源是pravega,它为我提供了一个flink连接器。
我的数据源正在向我发送一些json数据,如下所示:
{"key": "value"}
{"key": "value2"}
{"key": "value3"}
...
...
以下是我的代码:
PravegaDeserializationSchema<ObjectNode> adapter = new PravegaDeserializationSchema<>(ObjectNode.class, new JavaSerializer<>());
FlinkPravegaReader<ObjectNode> source = FlinkPravegaReader.<ObjectNode>builder()
.withPravegaConfig(pravegaConfig)
.forStream(stream)
.withDeserializationSchema(adapter)
.build();
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<ObjectNode> dataStream = env.addSource(source).name("Pravega Stream");
dataStream.map(new MapFunction<ObjectNode, String>() {
@Override
public String map(ObjectNode node) throws Exception {
return node.toString();
}
})
.keyBy("word") // ERROR
.timeWindow(Time.seconds(10))
.sum("count");
如你所见,我用了 FlinkPravegaReader
以及一个合适的反序列化程序来获取来自pravega的json流。
然后我试着把json数据转换成一个字符串, KeyBy
数一数。
但是,我得到一个错误:
The program finished with the following exception:
Field expression must be equal to '*' or '_' for non-composite types.
org.apache.flink.api.common.operators.Keys$ExpressionKeys.<init>(Keys.java:342)
org.apache.flink.streaming.api.datastream.DataStream.keyBy(DataStream.java:340)
myflink.StreamingJob.main(StreamingJob.java:114)
看来 KeyBy
引发了此异常。
嗯,我不是FlinkMaven,所以我不知道为什么。我已经阅读了官方示例的源代码 WordCount
. 在该示例中,有一个custom拆分器,用于将字符串数据拆分为单词。
所以我在想,在这种情况下我是否也需要使用某种拆分器?如果是,我应该使用什么样的分配器?你能给我举个例子吗?如果没有,为什么会出现这样的错误,如何解决?
1条答案
按热度按时间hmmo2u0o1#
我猜您已经阅读了有关如何指定键的文档
指定键
示例代码使用
keyby("word")
因为word
是pojo类型的字段WC
.在你的情况下,你把
map
前操作员keyBy
,以及map
操作员是string
. 所以显然没有word
在你的案子里。如果你真的想把它分组string
stream,你得这样写.keyBy(String::toString)
或者你甚至可以实现一个定制的keySelector
生成自己的key
.自定义键选择器