当检查点打开时,使用简单的cep循环模式
private Pattern<Tuple2<Integer, SimpleBinaryEvent>, ?> alertPattern = Pattern.<Tuple2<Integer, SimpleBinaryEvent>>begin("start").where(checkStatusOn)
.followedBy("middle").where(checkStatusOn).times(2)
.next("end").where(checkStatusOn).within(Time.minutes(5))
我看到了失败。
simplebinaryevent是
public class SimpleBinaryEvent implements Serializable {
private int id;
private int sequence;
private boolean status;
private long time;
public SimpleBinaryEvent(int id, int sequence, boolean status , long time) {
this.id = id;
this.sequence = sequence;
this.status = status;
this.time = time;
}
public int getId() {
return id;
}
public int getSequence() {
return sequence;
}
public boolean isStatus() {
return status;
}
public long getTime() {
return time;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
SimpleBinaryEvent that = (SimpleBinaryEvent) o;
if (getId() != that.getId()) return false;
if (isStatus() != that.isStatus()) return false;
if (getSequence() != that.getSequence()) return false;
return getTime() == that.getTime();
}
@Override
public int hashCode() {
//return Objects.hash(getId(),isStatus(), getSequence(),getTime());
int result = getId();
result = 31 * result + (isStatus() ? 1 : 0);
result = 31 * result + getSequence();
result = 31 * result + (int) (getTime() ^ (getTime() >>> 32));
return result;
}
@Override
public String toString() {
return "SimpleBinaryEvent{" +
"id='" + id + '\'' +
", status=" + status +
", sequence=" + sequence +
", time=" + time +
'}';
}
}
故障原因:
Caused by: java.lang.Exception: Could not materialize checkpoint 2 for operator KeyedCEPPatternOperator -> Map (1/1).
... 6 more
Caused by: java.util.concurrent.ExecutionException: java.lang.IllegalStateException: Could not find id for entry: SharedBufferEntry(ValueTimeWrapper((1,SimpleBinaryEvent{id='1', status=true, sequence=95, time=1505503380000}), 1505503380000, 0),....
我确信我已经按它应该的方式实现了equals()和hashcode()。我也试过objects.hashcode。在其他情况下,我在sharedbuffer.tostring()上有循环引用(因此也有stackoverflow),它再次指出了引用的问题(相等和不相等)。如果不打开检查点,它将按预期工作。我正在本地群集上运行。cep生产准备好了吗?
我使用的是1.3.2 flink
1条答案
按热度按时间ssgvzors1#
非常感谢您尝试了图书馆和报告这一点!
随着越来越多的功能添加到该库中,该库正在积极开发中。1.3是这个库的第一个版本,它的语义如此丰富,因此我们希望看到1)人们如何使用它,2)是否有任何bug。所以我想说,它不是100%的生产准备,但它是不远。
现在对于手头的问题,我想您正在使用rocksdb进行检查点,对吗?我假设的原因是,使用rocksdb,在每个水印(在事件时间)您都会反序列化必要的状态(例如
NFA
),处理一些事件,然后在将其放回rocksdb之前再次序列化。文件系统状态后端不是这种情况,只有在检查点设置时才序列化状态,只有在恢复时才读取并反序列化状态。所以在本例中,假设您说过,如果不检查您的工作是否正常,那么只有在从失败中恢复之后,您才会看到这个问题。
问题的根源可能是
equals()
/hashcode()
是bug(似乎不是这样),或者在序列化/反序列化cep状态的方式上有问题。你能不能提供一个最小的事件输入序列来产生这种情况?这对再现问题非常有帮助。
多谢,科斯塔斯