我第一次将apache flink与aws kines结合使用。基本上,我的目标是以这样一种方式转换来自动觉流的传入数据,即我可以执行简单的转换,例如过滤和聚合。
我使用以下方法添加源代码:
return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties));
最终,当我打印传入流时,我得到了预期的json数据:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> input = createSourceFromStaticConfig(env);
input.print();
这是打印的示例结果:
{“event_num”:“5530”,“timestmap”:“2019-03-04 14:29:44.882376”,“amount”:“80.4”,“type”:“purchase”}{“event_num”:“5531”,“timestmap”:“2019-03-04 14:29:44.881379”,“amount”:“11.98”,“type”:“service”}
有人能告诉我如何访问这些json元素,使我能够执行简单的转换,比如只选择包含“service”的记录作为类型吗?
1条答案
按热度按时间of1yzvn41#
当你使用
SimpleStringSchema
生成的事件流的类型为String
. 因此,您需要首先解析字符串,然后应用过滤器等。您可能想看看jsonnodedeserializationschema,它将生成
ObjectNode
.