Pattern< Tuple3< String, String, String >, ? > pattern = Pattern.<Tuple3< String, String, String > > begin( "start" )
.next( "3" ).where( new FilterFunction< Tuple3< String, String, String > >() {
@Override
public boolean filter ( Tuple3< String, String, String > value ) throws Exception {
return value.f2.equals( "3" );
}
} )
.next( "4" ).subtype(Tuple.getTupleClass( 2 )).where( new FilterFunction< Tuple2< String, String> >() {
@Override
public boolean filter ( Tuple2< String, String > value ) throws Exception {
return value.f1.equals( "3" );
}
} )
子类型(tuple.gettupleclass(2)),并导致错误 Inferred type 'capture<? extends org.apapche.flink.api.java.tuple.Tuple>' for type parameter 'S' is not within its bound;should extend 'org.apapche.flink.api.java.tuple.Tuple3<java.lang.String,java.lang.String,java.lang.String>'
我应该修改这个吗?但是怎么修改呢? Pattern< Tuple3< String, String, String >, ? > pattern
2017012更新
JoinedStreams< Tuple2< String, String >, Tuple3< String, String, String > >.Where< String >.EqualTo
joinedStreams = someStream
.join( otherStream )
.where( value -> value.f1 )
.equalTo( value -> value.f1 );
Pattern< Tuple, ? > pattern = Pattern.< Tuple > begin( "start" )
.subtype( Tuple3.class )
.where( evt -> evt.f2.equals( "3" ) )
.next( "4" )
.subtype( Tuple2.class )
.where( evt -> evt.f1.equals( "3" ) )
.within( Time.seconds( 10 ) );
PatternStream< ...> patternStream = CEP.pattern( joinedStreams, pattern );
我试过了,不知道该填什么 PatternStream< ...>
谢谢任何能提供帮助的人。
2条答案
按热度按时间3df52oht1#
这个呢:
你不必在开始后加下一个
注意子类型的字面意义,tuple3和tuple2应该扩展tuple。
如果要连接两个不同的数据流。
然后可以使用comap、coflatmap来获得相同的类型,例如将tuple2、tuple3转换为string:connectedstreams→ 数据流
以下是一些有用的链接,介绍了一个好的用例:
使用apache flink引入复杂事件处理(cep)
我翻译的中文版本
nkcskrwz2#
请尝试以下代码:
从普通类型开始
Tuple
使用混凝土类型Tuple2
以及Tuple3
对于子事件。并且此模式的数据流必须具有Tuple
类型。