这是我第一次使用ApacheFlink(1.3.1)并提出问题。更详细地说,我正在与Flink核心,Flinkcep和Flink流媒体库。我的应用程序是一个akka actor系统,它使用来自rabbitmq的消息,不同的参与者处理这些消息。在一些actor中,我想示例化一个 StreamExecutionEnvironment
并处理传入的消息。因此,我编写了一个扩展 RichSourceFunction
班级。除了一件事之外,一切都正常:我不知道如何将数据发送到我的flink扩展。以下是我的设置:
public class FlinkExtension {
private static StreamExecutionEnvironment environment;
private DataStream<ValueEvent> input;
private CustomSourceFunction function;
public FlinkExtension(){
environment = StreamExecutionEnvironment.getExecutionEnvironment();
function = new CustomSourceFunction();
input = environment.addSource(function);
PatternStream<ValueEvent> patternStream = CEP.pattern(input, _pattern());
DataStream<String> warnings = patternStream.select(new PatternSelectFunction<ValueEvent, String>() {
@Override
public String select(Map<String, List<ValueEvent>> pattern) throws Exception {
return null; //TODO
}
});
warnings.print();
try {
environment.execute();
} catch(Exception e){
e.printStackTrace();
}
}
private Pattern<ValueEvent, ?> _pattern(){
return Pattern.<ValueEvent>begin("first").where(new SimpleCondition<ValueEvent>() {
@Override
public boolean filter(ValueEvent value) throws Exception {
return value.getValue() > 10;
}
});
}
public void sendData(ValueEvent value){
function.sendData(value);
}
}
这是我的自定义源函数:
public class CustomSourceFunction extends RichSourceFunction<ValueEvent> {
private volatile boolean run = false;
private SourceContext<ValueEvent> context;
@Override
public void open(Configuration parameters){
run = true;
}
@Override
public void run(SourceContext<ValueEvent> ctx) throws Exception {
this.context = ctx;
while (run){
}
}
public void sendData(ValueEvent value){
this.context.collectWithTimestamp(value, Calendar.getInstance().getTimeInMillis());
}
@Override
public void cancel() {
run = false;
}
}
所以我想调用这个方法 sendData
在我的 FlinkExtension
从外部类以连续的方式将数据写入我的 FlinkExtension
. 下面是我的junit测试,它应该向扩展发送数据,然后将数据写入 SourceContext
.
@Test
public void testSendData(){
FlinkExtension extension = new FlinkExtension();
extension.sendData(new ValueEvent(30));
}
但是如果我运行测试,什么都不会发生,应用程序挂起在 CustomSourceFunction
. 我也试着在这个世界上创造一条新的无止境的线 CustomSourceFunction
运行方法。
总结一下:有人知道如何连续地将数据从应用程序写入flink示例吗?
2条答案
按热度按时间4smxwvx51#
flink源连接器通过在while(run)循环中调用collect()(或collectwithtimestamp())的run()方法发出连续的数据流。如果您想学习一个示例,apachenifi源代码并不像大多数代码那样复杂;下面是它的run方法。
ijnw1ujt2#
问题是有不同的对象示例
CustomSourceFunction
正在被run
方法和步骤sendData
方法。因此context
对象未在方法之间共享,正在添加新的ValueEvent
不起作用。要解决此问题,请存储
run
方法作为CustomSourceFunction
班级。当你需要创建一个新的ValueEvent
,调用sendData
方法。请参见下面的示例代码
添加新规则