无法在与其他源代码联合的apacheflink中的自定义源代码函数中休眠

avwztpqn  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(469)

我有两个来源,一个是Kafka源和一个是自定义源,我需要使睡眠自定义源一个小时,但我得到以下中断。

java.lang.InterruptedException: sleep interrupted
    at java.lang.Thread.sleep(Native Method)
    at com.hulu.hiveIngestion.HiveAddPartitionThread.run(HiveAddPartitionThread.java:48)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)

代码:

<kafka_Source>.union(<custom_source>)

public class custom_source implements SourceFunction<String> {
public void run(SourceContext<String> ctx)  {
 while(true)
 {
  Thread.sleep(1000);
  ctx.collect("string");
 }
}
}

如何使睡眠自定义源,而Kafka源将继续其流。为什么我得到线程中断异常?

lh80um4z

lh80um4z1#

这是一个java问题,而不是flink问题。简而言之,您永远不能依赖thread.sleep(x)来睡眠x毫秒。正确地支持中断也很重要,否则您就不能优雅地关闭您的工作。

public class custom_source implements SourceFunction<String> {
    private static final Duration SLEEP_DURATION = Duration.ofHours(1);
    private volatile boolean isCanceled = false;

    public void run(SourceContext<String> ctx) {
        while (!isCanceled) {
            // 1 hour wait time
            LocalTime end = LocalTime.now().plusHours(1);
            // this loop ensures that random interruption is not prematurely closing the source
            while (LocalTime.now().compareTo(end) < 0) {
                try {
                    Thread.sleep(Duration.between(LocalTime.now(), end).toMillis());
                } catch (InterruptedException e) {
                    // swallow interruption unless source is canceled
                    if (isCanceled) {
                        Thread.interrupted();
                        return;
                    }
                }
            }
            ctx.collect("string");
        }
    }

    @Override
    public void cancel() {
        isCanceled = true;
    }
}

相关问题