如何在apache flink中分割nodeobject的数据

fivyi3re  于 2021-06-26  发布在  Flink
关注(0)|答案(1)|浏览(517)

我使用flink来处理来自某些数据源(如kafka、pravega等)的数据。
在我的例子中,数据源是pravega,它为我提供了一个flink连接器。
我的数据源正在向我发送一些json数据,如下所示:

{"key": "value"}
{"key": "value2"}
{"key": "value3"}
...
...

以下是我的代码:

PravegaDeserializationSchema<ObjectNode> adapter = new PravegaDeserializationSchema<>(ObjectNode.class, new JavaSerializer<>());
FlinkPravegaReader<ObjectNode> source = FlinkPravegaReader.<ObjectNode>builder()
    .withPravegaConfig(pravegaConfig)
    .forStream(stream)
    .withDeserializationSchema(adapter)
    .build();

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<ObjectNode> dataStream = env.addSource(source).name("Pravega Stream");
dataStream.map(new MapFunction<ObjectNode, String>() {
        @Override
        public String map(ObjectNode node) throws Exception {
            return node.toString();
        }
    })
    .keyBy("word")    // ERROR
    .timeWindow(Time.seconds(10))
    .sum("count");

如你所见,我用了 FlinkPravegaReader 以及一个合适的反序列化程序来获取来自pravega的json流。
然后我试着把json数据转换成一个字符串, KeyBy 数一数。
但是,我得到一个错误:

The program finished with the following exception:

Field expression must be equal to '*' or '_' for non-composite types.
        org.apache.flink.api.common.operators.Keys$ExpressionKeys.<init>(Keys.java:342)
        org.apache.flink.streaming.api.datastream.DataStream.keyBy(DataStream.java:340)
        myflink.StreamingJob.main(StreamingJob.java:114)

看来 KeyBy 引发了此异常。
嗯,我不是FlinkMaven,所以我不知道为什么。我已经阅读了官方示例的源代码 WordCount . 在该示例中,有一个custom拆分器,用于将字符串数据拆分为单词。
所以我在想,在这种情况下我是否也需要使用某种拆分器?如果是,我应该使用什么样的分配器?你能给我举个例子吗?如果没有,为什么会出现这样的错误,如何解决?

hmmo2u0o

hmmo2u0o1#

我猜您已经阅读了有关如何指定键的文档
指定键
示例代码使用 keyby("word") 因为 word 是pojo类型的字段 WC .

// some ordinary POJO (Plain old Java Object)
public class WC {
  public String word;
  public int count;
}
DataStream<WC> words = // [...]
DataStream<WC> wordCounts = words.keyBy("word").window(/*window specification*/);

在你的情况下,你把 map 前操作员 keyBy ,以及 map 操作员是 string . 所以显然没有 word 在你的案子里。如果你真的想把它分组 string stream,你得这样写 .keyBy(String::toString) 或者你甚至可以实现一个定制的 keySelector 生成自己的 key .
自定义键选择器

相关问题