触发后取消Kafka流上的标点符号

3pmvbmvn  于 2021-06-04  发布在  Kafka
关注(0)|答案(2)|浏览(344)

我在一个转换器上创建了一个定时标点器,并将其定期运行(使用kafka v2.1.0)。每次我接受一个特定的密钥时,我都会创建一个这样的新密钥

scheduled = context.schedule (Duration.ofMillis(scheduleTime),
             PunctuationType.WALL_CLOCK_TIME,new CustomPunctuator(context, customStateStoreName));

我的问题是,所有这些标点符号我创建运行不断,我找不到一种方法来取消他们。我在网上找到了一个片段

private Cancellable scheduled;

    @Override
    public void init(PorcessorContext processContext) {
        this.context = processorContext;
        scheduled = context.schedule(TimeUnit.SECONDS.toMillis(5), PunctuationType.WALL_CLOCK_TIME,
                                     this::punctuateCancel);
    }

    private void punctuateCancel(long timestamp) {
        scheduled.cancel();
    }

但不幸的是,这似乎只取消了最新创建的标点符号。
我正在编辑我的文章,只是想进一步了解我的方法,以及这与沃兹尼亚的评论有何关系。所以我的方法非常类似,只是使用一个Map,因为每个事件只需要一个标点或活动键,所以在我的transformer类中启动

private Map<String,Cancellable> scheduled  = new HashMap<>();

在我的转换方法中,我执行下面的代码

{
 final Cancellable cancelSched = scheduled.get(recordKey);

 // Every time I get a new event I cancel my previous Punctuator
 // and schedule a new one ( context.schedule a few lines later)

 if(cancelSched != null)
    cancelSched.cancel();

 // This is supposed to work like a closure by capturing the currentCancellable which in the next statement
 // is moved to the scheduled map. Scheduled map at any point will have the only active Punctuator for a
 // specific String as it is constantly renewed
 // Note: Previous registered punctuators have already been cancelled as it can be seen by the previous
 // statement (cancelSched.cancel();)

 Cancellable currentCancellable = context.schedule(Duration.ofMillis(scheduleTime), PunctuationType.WALL_CLOCK_TIME,
                new CustomPunctuator(context, recordKey ,()-> scheduled ));

 // Update Active Punctuators for a specific key.

 scheduled.put(recordKey,currentCancellable);   
}

我在我的标点符号或标点符号方法上使用注册回调来取消最后一个活动标点符号或在它启动之后。它似乎工作(虽然不确定),但它感觉非常“黑客”,并不是那种解决方案,它肯定是可取的。
所以我怎样才能取消一个标点符号或触发后。有办法解决这个问题吗?

drnojrws

drnojrws1#

我也遇到过同样的情况,只是为了那些对scala感兴趣的人,我把它当作一个例子来处理

val punctuation = new myPunctuation()
val scheduled:Cancellable=context.schedule(Duration.ofSeconds(5), PunctuationType.WALL_CLOCK_TIME, punctuation)
punctuation.schedule=scheduled

班级

class myPunctuation() extends Punctuator{
      var schedule: Cancellable = _
      override def punctuate(timestamp: Long): Unit = {
      println("hello")
      schedule.cancel()
      }

    }

很有魅力

8yparm6h

8yparm6h2#

我认为你可以做以下一件事:

class CustomPunctuator implements Punctuator {
  final Cancellable schedule;

  public void punctuate(final long timestamp) {
    // business logic
    if (/* do cancel */) {
      schedule.cancel()
    }
  }
}

// registering a punctuation
{
  final CustomPunctuator punctuation = new CustomPunctuator();
  final Cancellable currentCancellable = context.schedule(
    Duration.ofMillis(scheduleTime),
    PunctuationType.WALL_CLOCK_TIME,
    punctuation);

  punctuation.schedule = currentCancellable;
}

这样,您就不需要维护 HashMap 给每个人 CustomPunctuator 一种取消自身的方法。

相关问题