java:当一个事件实际上丢失时,如何对收集事件msg的类设置时间限制

nsc4cvqm  于 2021-07-03  发布在  Java
关注(0)|答案(2)|浏览(376)

我想找到一种方法来防止“collector”类永远等待,因为它的内容是不完整的,而且它无法知道丢失的事件消息不存在,无法传递给它。
我有一个名为eventset的类,它具有从jms onmessage侦听器传递给它的相关事件msg(来自监视)。每个相关事件都有一个序列号,最后一个事件称为“txnend”,它的序列号最高。当事件msgs的总数等于txnend的序列号时,我知道所有事件都存在,并且我的代码指示回调类处理事件集。
当缺少一个事件时,eventset的示例会一直耐心地等待。
我解决这个问题的想法是,在创建类时,以某种方式对等待时间进行限制。当超过这个值时,我的代码应该指示回调处理不完整的事件集。
在过去的2/3个月里,我曾在密歇根州的两个场合,根据建议和想法,尝试这个和那个,但都没有成功。直到现在,这是可能的停车,因为它几乎从来没有发生过。现在,我确实需要解决它。
从本质上说,这似乎是一个相当普遍的情况,应该有一个直接的解决办法。我没找到一个。
如有任何建议或解决办法,我将不胜感激。

scyqe7ek

scyqe7ek1#

解决方案是使用计时器和timertask(作为内部类)。如果接收到所有相关事件,则构造函数将创建计时器并按所需的秒数调度timertask,然后调用回调并取消计时器。在少有的情况下,如果事件丢失,timertask会启动并向回调发送一组不完整的事件并记录错误。
仅供参考。
公共类事件集{

private MonitorEventsConsumer eventsConsumer;

// For timeout handling
private Timer timer;
private int waitTime; // seconds to wait 

protected boolean isComplete;

private String localTxnId;
String eventSourceAddress;
private boolean txnEndReceived;
private int maxCounter = 0;
private List<String> monEvents = new ArrayList<>();

public EventSet(String localTxnId) {
    this(localTxnId, 20); // Default the waiting time for all events to be received to 20 secs
}
public EventSet(String localTxnId, int waitTime) {
    this.localTxnId = localTxnId;
    timer = new Timer();
    this.waitTime = waitTime;
    timer.schedule(new TimeoutTask(), waitTime*1000);
}

public boolean isNew() {
    return monEvents.isEmpty();
}

public void addEvent(String eventAsXML) {

    monEvents.add(eventAsXML);
    String counter = StringUtils.substringBetween(eventAsXML, "wmb:counter=\"", "\"/>");
    eventSourceAddress = StringUtils.substringBetween(eventAsXML, " wmb:eventSourceAddress=\"", "\">");
    if (eventSourceAddress.endsWith("transaction.End")) {
        txnEndReceived = true;
    }

    int iCounter = Integer.parseInt(counter);
    if (iCounter > maxCounter) {
        maxCounter = iCounter;
    }

    if (txnEndReceived && maxCounter == monEvents.size()) {
        // all events should have been received unless maxCounter = 1
        // Call back to the onMessageHandler to process this event set.
        eventsConsumer.sendEventsToNjams(localTxnId, this);
        isComplete = true;
        timer.cancel();
    }
}

public List<String> getMonEvents() {
    return monEvents;
}

public void clear() {
    monEvents.clear();
    localTxnId = null;
    isComplete = false;
    maxCounter = 0;
}

public void registerHandler(final MonitorEventsConsumer eventsConsumer) {
    if (this.eventsConsumer == null) {
        this.eventsConsumer = eventsConsumer;
    }
}

class TimeoutTask extends TimerTask {
    public void run() {            
        eventsConsumer.sendEventsToNjams(localTxnId, EventSet.this);
        isComplete = true;
        System.err.println(String.format("Message events for local txn id '%s' were incomplete after waiting for %d seconds. they were sent to nJAMS in this state", localTxnId, waitTime));
        timer.cancel(); //Terminate the timer thread
    }
}

}

eeq64g8w

eeq64g8w2#

我不知道你的库是否支持这种方法,但是一个普通的java尝试会像这样:

public class MessageProcessor extends Thread {
  private static final long PROCESSOR_TIMEOUT   = 3600 * 1000 /* 1 hour? */;
  private static final long ONE_MESSAGE_TIMEOUT = 1000 /* 1 sec? */;
  private long              m_startTime         = 0;

  public void startProcessing() {
    m_startTime = System.currentTimeMillis();
    start();
  }

  @Override
  public void run() {
    while(true) {
      // ---------------------------------
      // check abort condition
      // ---------------------------------
      if(System.currentTimeMillis() - m_startTime > PROCESSOR_TIMEOUT) {
        break;
      }

      // ---------------------------------
      // handle 1 message (with timeout)
      // ---------------------------------
      try {
        // pseudo code
        collectOneMessage(ONE_MESSAGE_TIMEOUT);
      }
      catch(InterruptedException e) {
        // ignoring timeout exception
      }
    }

    // ---------------------------------
    // finish up
    // ---------------------------------
    // ...
  }
}

相关问题