java—以连续的方式将自定义源代码中的数据写入flink

mrzz3bfm  于 2021-06-25  发布在  Flink
关注(0)|答案(2)|浏览(298)

这是我第一次使用ApacheFlink(1.3.1)并提出问题。更详细地说,我正在与Flink核心,Flinkcep和Flink流媒体库。我的应用程序是一个akka actor系统,它使用来自rabbitmq的消息,不同的参与者处理这些消息。在一些actor中,我想示例化一个 StreamExecutionEnvironment 并处理传入的消息。因此,我编写了一个扩展 RichSourceFunction 班级。除了一件事之外,一切都正常:我不知道如何将数据发送到我的flink扩展。以下是我的设置:

public class FlinkExtension {

    private static StreamExecutionEnvironment environment;
    private DataStream<ValueEvent> input;
    private CustomSourceFunction function;

    public FlinkExtension(){

        environment = StreamExecutionEnvironment.getExecutionEnvironment();

        function = new CustomSourceFunction();
        input = environment.addSource(function);

        PatternStream<ValueEvent> patternStream = CEP.pattern(input, _pattern());

        DataStream<String> warnings = patternStream.select(new PatternSelectFunction<ValueEvent, String>() {
            @Override
            public String select(Map<String, List<ValueEvent>> pattern) throws Exception {
                return null; //TODO
            }
        });

        warnings.print();

        try {
            environment.execute();
        } catch(Exception e){
            e.printStackTrace();
        }

    }

    private Pattern<ValueEvent, ?> _pattern(){

        return Pattern.<ValueEvent>begin("first").where(new SimpleCondition<ValueEvent>() {
            @Override
            public boolean filter(ValueEvent value) throws Exception {
                return value.getValue() > 10;
            }
        });
    }

    public void sendData(ValueEvent value){
        function.sendData(value);
    }
}

这是我的自定义源函数:

public class CustomSourceFunction extends RichSourceFunction<ValueEvent> {

    private volatile boolean run = false;
    private SourceContext<ValueEvent> context;

    @Override
    public void open(Configuration parameters){
        run = true;
    }

    @Override
    public void run(SourceContext<ValueEvent> ctx) throws Exception {
        this.context = ctx;

        while (run){

        }
    }

    public void sendData(ValueEvent value){
        this.context.collectWithTimestamp(value, Calendar.getInstance().getTimeInMillis());
    }

    @Override
    public void cancel() {
        run = false;
    }
}

所以我想调用这个方法 sendData 在我的 FlinkExtension 从外部类以连续的方式将数据写入我的 FlinkExtension . 下面是我的junit测试,它应该向扩展发送数据,然后将数据写入 SourceContext .

@Test
public void testSendData(){
    FlinkExtension extension = new FlinkExtension();
    extension.sendData(new ValueEvent(30));
}

但是如果我运行测试,什么都不会发生,应用程序挂起在 CustomSourceFunction . 我也试着在这个世界上创造一条新的无止境的线 CustomSourceFunction 运行方法。
总结一下:有人知道如何连续地将数据从应用程序写入flink示例吗?

4smxwvx5

4smxwvx51#

flink源连接器通过在while(run)循环中调用collect()(或collectwithtimestamp())的run()方法发出连续的数据流。如果您想学习一个示例,apachenifi源代码并不像大多数代码那样复杂;下面是它的run方法。

ijnw1ujt

ijnw1ujt2#

问题是有不同的对象示例 CustomSourceFunction 正在被 run 方法和步骤 sendData 方法。因此 context 对象未在方法之间共享,正在添加新的 ValueEvent 不起作用。
要解决此问题,请存储 run 方法作为 CustomSourceFunction 班级。当你需要创建一个新的 ValueEvent ,调用 sendData 方法。
请参见下面的示例代码

package RuleSources;

import Rules.Rule;
import org.apache.flink.streaming.api.watermark.Watermark;

import java.util.ArrayList;

public class DynamicRuleSource extends AlertingRuleSource {
    private static DynamicRuleSource sourceObj;

    private SourceContext<Rule> ctx;

    public static DynamicRuleSource getSourceObject() {
        return sourceObj;
    }

    public void run(SourceContext<Rule> ctx) throws Exception {
        this.ctx = ctx;
        sourceObj = this;
        while(true) {
            Thread.sleep(100);
        }
    }

    public void addRule(Rule rule) {
        ctx.collect(rule);
    }

    @Override
    public void cancel() {
    }
}

添加新规则

public static void addRule(Rule rule) throws Exception {
        AlertingRuleSource sourceObject = DynamicRuleSource.getSourceObject();
        sourceObject.addRule(rule);
    }

相关问题