我正在处理以下程序,并设置了WatermarkStrategy,但是当我使用inEventTime()方法运行该程序时,它不会给予任何输出。
注意:当我在pattern上使用inProcessingTime()时,同样的程序也能工作。
public class FlinkCEPTest {
@SuppressWarnings("deprecation")
public static void main(String[] args) throws Exception {
ParameterTool parameter = ParameterTool.fromArgs(args);
final String bootstrapServers = parameter.get("kafka.broker", "localhost:9092,broker:29092");
final String inputTopic_1 = parameter.get("input.topic.1","acctopic");
final String inputTopic_2 = parameter.get("input.topic.2","txntopic");
final String outputTopic = parameter.get("output.topic.q","alerttopic");
final String groupID = parameter.get("group.id","flink-demo-grp-id");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
KafkaSource<EventMessage> source_1 = KafkaSource.<EventMessage>builder()
.setBootstrapServers(bootstrapServers)
.setTopics(inputTopic_1).setGroupId(groupID)
.setStartingOffsets(OffsetsInitializer.latest())
.setDeserializer(new EventSchema())
.build();
DataStream<EventMessage> text_1 = env.fromSource(source_1,
WatermarkStrategy
.<EventMessage>forBoundedOutOfOrderness(Duration.ofSeconds(300))
.withTimestampAssigner((event, trtimestamp)-> {
//System.err.println("Kafka ingetstion ts : " + trtimestamp);
//System.err.println("Event ts : "+ event.getTxnDate().getTime());
return event.getTxnDate().getTime();})
, "Kafka Source 1");
DataStream<EventMessage> partitionedInput = text_1.keyBy(evt -> evt.getAccountId());
//partitionedInput.print();
Pattern<EventMessage, ?> relaxedAlarmPattern = Pattern.<EventMessage>begin("first").subtype(EventMessage.class)
.where(new SimpleCondition<EventMessage>() {
private static final long serialVersionUID = 1L;
@Override
public boolean filter(EventMessage value) throws Exception {
return value.getEvent().equalsIgnoreCase("PASSWORD_CHANGE_SUCC");
}
}).followedBy("second").subtype(EventMessage.class).where(new IterativeCondition<EventMessage>() {
private static final long serialVersionUID = 1L;
@Override
public boolean filter(EventMessage value, Context<EventMessage> ctx) throws Exception {
Iterable<EventMessage> test = ctx.getEventsForPattern("first");
Integer accid = 0;
for (EventMessage te : test) {
accid = te.getAccountId();
}
return value.getEvent().equalsIgnoreCase("BENIFICIARY_ADDED")
&& value.getAccountId().equals(accid);
}
}).followedBy("third").subtype(EventMessage.class).where(new IterativeCondition<EventMessage>() {
private static final long serialVersionUID = 1L;
@Override
public boolean filter(EventMessage value, Context<EventMessage> ctx) throws Exception {
Integer accid = 0;
Iterable<EventMessage> test = ctx.getEventsForPattern("first");
for (EventMessage te : test) {
accid = te.getAccountId();
}
return value.getEvent().equalsIgnoreCase("TXN_NEW")
&& value.getAccountId().equals(accid) && value.getAmt() <= 10;
}
}).followedBy("last").subtype(EventMessage.class).where(new IterativeCondition<EventMessage>() {
private static final long serialVersionUID = 1L;
@Override
public boolean filter(EventMessage value, Context<EventMessage> ctx) throws Exception {
Integer accid = 0;
Iterable<EventMessage> test = ctx.getEventsForPattern("first");
for (EventMessage te : test) {
accid = te.getAccountId();
}
return value.getEvent().equalsIgnoreCase("TXN_NEW")
&& value.getAccountId().equals(accid) && value.getAmt() >= 100 ;
}
}).within(Time.seconds(300));
PatternStream<EventMessage> patternStream = CEP.pattern(partitionedInput, relaxedAlarmPattern)
.inEventTime();
//.inProcessingTime();
DataStream<String> alarms = patternStream.select(new PatternSelectFunction<EventMessage, String>() {
private static final long serialVersionUID = 1L;
@Override
public String select(Map<String, List<EventMessage>> pattern) throws Exception {
EventMessage first = (EventMessage) pattern.get("first").get(0);
EventMessage middle = (EventMessage) pattern.get("second").get(0);
EventMessage third = (EventMessage) pattern.get("third").get(0);
EventMessage last = (EventMessage) pattern.get("last").get(0);
return "WARNING : Possible fraud scenario [ Party ID " + first.getPartyId()
+ " recently changed his password and added a beneficiary and later made transcations of "
+ third.getAmt() + " and " + last.getAmt()+" ]";
}
});
alarms.print();
env.execute(" CEP ");
}
}
如果我把下面这行
PatternStream<EventMessage> patternStream = CEP.pattern(partitionedInput, relaxedAlarmPattern).inEventTime();
至
PatternStream<EventMessage> patternStream = CEP.pattern(partitionedInput, relaxedAlarmPattern).inProcessingTime();
代码工作,任何建议,我如何才能使它与inEventTime()方法.
1条答案
按热度按时间11dmarpk1#
通常Kafka源代码的问题是并行度高于分区数,或者不是所有分区都接收数据,这使得水印无法向前推进。您可以通过调整并行度或将
withIdleness
与水印策略一起使用来解决此问题。请参阅Kafka连接器文档中的更多信息。