flink错误处理和条件处理

i5desfxk  于 2021-06-25  发布在  Flink
关注(0)|答案(1)|浏览(416)

我是新来的flink,已经通过网站/例子/博客开始了。我正在努力正确使用运算符。基本上我有两个问题
问题1:flink是否支持声明性异常处理,我需要处理parse/validate/。。。错误?
我可以使用org.apache.flink.runtime.operators.sort.exceptionhandler或类似工具来处理错误吗?
还是富/平面图功能是我的最佳选择?如果rich/flatmap是唯一的选项,那么有没有一种方法可以在rich/flatmap函数中获取流的句柄,这样就可以连接接收器进行错误处理?
问题2:我可以有条件地附加不同的Flume吗?
基于键控分割流中的特定字段,我需要选择不同的接收器,我是再次分割流还是使用rich/flatmap来处理?
我正在使用flink 1.3.2。这是我工作的相关部分

.....
    .....
    DataStream<String> eventTextStream = env.addSource(messageSource)

    KeyedStream<EventPojo, Tuple> eventPojoStream = eventTextStream
            // parse, transform or enrich
            .flatMap(new MyParseTransformEnrichFunction())
            .assignTimestampsAndWatermarks(new EventAscendingTimestampExtractor())
            .keyBy("eventId");

    // split stream based on eventType as different reduce and windowing functions need to be applied
    SplitStream<EventPojo> splitStream = eventPojoStream
            .split(new EventStreamSplitFunction());

    // need to apply reduce function
    DataStream<EventPojo> event1TypeStream = splitStream.select("event1Type");

    // need to apply reduce function
    DataStream<EventPojo> event2TypeStream = splitStream.select("event2Type");

    // need to apply time based windowing function
    DataStream<EventPojo> event3TypeStream = splitStream.select("event3Type");

    ....
    ....

    env.execute("Event Processing");

我在这里使用正确的运算符吗?
更新1:
尝试使用@alpinegizmo建议的processfunction,但是没有成功,因为它依赖于一个键控流,在解析/验证输入之前我没有这个键控流。我得到“invalidProgrammeException:对于非复合类型,字段表达式必须等于'*'或''。”。
这是一个非常常见的用例,在这个用例中,您的第一个解析/验证输入并没有键控流,那么您如何解决它呢?
谢谢你的耐心和帮助。

vwkv1x7d

vwkv1x7d1#

你忽略了一个关键的组成部分。看看侧输出。
这种机制提供了一种类型安全的方法来生成任意数量的额外输出流。在其他用途中,这是一种报告错误的干净方法。在flink 1.3中,边输出只能与processfunction一起使用,但1.4将向processwindowfunction添加边输出。

相关问题