AmazonWeb服务—解析传入数据流中的json以在flink中执行简单转换

ruarlubt  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(315)

我第一次将apache flink与aws kines结合使用。基本上,我的目标是以这样一种方式转换来自动觉流的传入数据,即我可以执行简单的转换,例如过滤和聚合。
我使用以下方法添加源代码:

return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties));

最终,当我打印传入流时,我得到了预期的json数据:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> input = createSourceFromStaticConfig(env);
input.print();

这是打印的示例结果:
{“event_num”:“5530”,“timestmap”:“2019-03-04 14:29:44.882376”,“amount”:“80.4”,“type”:“purchase”}{“event_num”:“5531”,“timestmap”:“2019-03-04 14:29:44.881379”,“amount”:“11.98”,“type”:“service”}
有人能告诉我如何访问这些json元素,使我能够执行简单的转换,比如只选择包含“service”的记录作为类型吗?

of1yzvn4

of1yzvn41#

当你使用 SimpleStringSchema 生成的事件流的类型为 String . 因此,您需要首先解析字符串,然后应用过滤器等。
您可能想看看jsonnodedeserializationschema,它将生成 ObjectNode .

相关问题