kafka kstream-使用带窗口的abstractprocessor

2w2cym1i  于 2021-06-07  发布在  Kafka
关注(0)|答案(3)|浏览(421)

我希望将kstream中的窗口批输出组合在一起,并将它们写入辅助存储。
我本来想看看的 .punctuate() 大概每30秒打一次电话。我得到的却保存在这里。
(原始文件有几千行)
摘要- .punctuate() 似乎是随机的,然后是反复的。它似乎不符合通过processorcontext.schedule()设置的值。

编辑:

相同代码的另一次运行产生了对 .punctuate() 大约每四分钟。这次我没有看到疯狂的重复值。来源没有变化-只是结果不同。
使用以下代码:

主要

StreamsConfig streamsConfig = new StreamsConfig(config);
KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> lines = kStreamBuilder.stream(TOPIC);

lines.process(new BPS2());

KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, streamsConfig);

kafkaStreams.start();

处理器

public class BP2 extends AbstractProcessor<String, String> {
    private static final Logger LOGGER = LoggerFactory.getLogger(BP2.class);

    private ProcessorContext context;
    private final long delay;
    private final ArrayList<String> values;

    public BP2(long delay) {
        LOGGER.debug("BatchProcessor() constructor");
        this.delay = delay;

       values = new ArrayList<>();

    }

    @Override
    public void process(String s, String s2) {
        LOGGER.debug("batched processor s:{}   s2:{}", s, s2);

        values.add(s2);
    }

    @Override
    public void init(ProcessorContext context) {
        LOGGER.info("init");

        super.init(context);

        values.clear();

        this.context = context;
        context.schedule(delay);
    }

    @Override
    public void punctuate(long timestamp) {
        super.punctuate(timestamp);

        LOGGER.info("punctuate   ts: {}   count: {}", timestamp, values.size());

        context().commit();
    }
}

处理器供应商

public class BPS2 implements ProcessorSupplier<String, String> {
    private static final Logger LOGGER = LoggerFactory.getLogger(BPS2.class);

    @Override
    public Processor<String, String> get() {
        try {
            return new BP2(30000);
        } catch(Exception exception) {
            LOGGER.error("Unable to instantiate BatchProcessor()", exception);
            throw new RuntimeException();
        }
    }
}

编辑:

为了确保我的调试器不会减慢速度,我构建了它,并在kafka进程的同一个框中运行它。这一次,它甚至没有尝试延迟4分钟或更长时间——在几秒钟内,它向用户输出虚假呼叫 .punctuate() . 其中许多(大多数)都没有任何干预电话 .process() .

rjjhvcjd

rjjhvcjd1#

好吧-我想这是Kafka的一个错误。
原因如下:
在我最初的测试中,我使用一台机器来运行生产者和消费者。我会运行producer几分钟来生成一些测试数据,然后运行我的测试。这将提供我最初发布的奇怪输出。
然后我决定把制作人推到后台,让它继续运行。现在我看到100%完美的30秒通话间隔 .punctuate() . 没有问题了。
换句话说,如果kafka服务器没有处理任何入站数据,那么它似乎与运行kstreams进程不一致。

pieyvz9o

pieyvz9o2#

刚看完这个问题的答案,我想也回答了你的问题。其要点是:
streams使用者执行记录轮询
所有返回的记录都已处理完毕。
然后使用配置的延迟安排标点回调。
要点是,标点符号不是固定的时间间隔事件,而#2所用时间的变化将导致标点符号的执行周期发生相同的变化。
……但读了那个链接,他说得比我好。

l3zydbqr

l3zydbqr3#

更新:这部分答案适用于Kafka0.11或更低版本(Kafka1.0及更高版本见下文)
在Kafka流中,标点符号基于流时间和非系统时间(又名处理时间)。
每个默认流时间是事件时间,即嵌入在kafka记录本身中的时间戳。因为您没有设置非默认值 TimestampExtractor (见 timestamp.extractor 在http://docs.confluent.io/current/streams/developer-guide.html#optional-配置参数),调用 punctuate 仅取决于事件时间的处理过程与您处理的记录有关。因此,如果需要几分钟来处理“30秒”(事件时间)的记录, punctuate 呼叫频率将低于30秒(挂钟时间)。。。
这也可以解释你不规则的通话模式(即突发和长时间延迟)。如果您的数据事件时间确实“跳跃”,并且要处理的数据在您的主题中已经完全可用,那么kafka streams也会“跳跃”到内部维护的流时间。
我想,你可以通过使用 WallclockTimestampExtractor (见http://docs.confluent.io/current/streams/developer-guide.html#timestamp-提取器)
还有一件事要提:流时间只有在处理数据时才会提前——如果应用程序到达输入主题的末尾并等待数据, punctuate 不会被调用。即使您使用 WallclockTimestampExtractor .
顺便说一句:目前正在讨论流的标点符号行为:https://github.com/apache/kafka/pull/1689
Kafka1.0及更高版本的答案
自Kafka1.0以来,可以根据挂钟时间或事件时间注册标点:https://kafka.apache.org/10/documentation/streams/developer-guide/processor-api.html#id2

相关问题