我希望将kstream中的窗口批输出组合在一起,并将它们写入辅助存储。
我本来想看看的 .punctuate()
大概每30秒打一次电话。我得到的却保存在这里。
(原始文件有几千行)
摘要- .punctuate()
似乎是随机的,然后是反复的。它似乎不符合通过processorcontext.schedule()设置的值。
编辑:
相同代码的另一次运行产生了对 .punctuate()
大约每四分钟。这次我没有看到疯狂的重复值。来源没有变化-只是结果不同。
使用以下代码:
主要
StreamsConfig streamsConfig = new StreamsConfig(config);
KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> lines = kStreamBuilder.stream(TOPIC);
lines.process(new BPS2());
KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, streamsConfig);
kafkaStreams.start();
处理器
public class BP2 extends AbstractProcessor<String, String> {
private static final Logger LOGGER = LoggerFactory.getLogger(BP2.class);
private ProcessorContext context;
private final long delay;
private final ArrayList<String> values;
public BP2(long delay) {
LOGGER.debug("BatchProcessor() constructor");
this.delay = delay;
values = new ArrayList<>();
}
@Override
public void process(String s, String s2) {
LOGGER.debug("batched processor s:{} s2:{}", s, s2);
values.add(s2);
}
@Override
public void init(ProcessorContext context) {
LOGGER.info("init");
super.init(context);
values.clear();
this.context = context;
context.schedule(delay);
}
@Override
public void punctuate(long timestamp) {
super.punctuate(timestamp);
LOGGER.info("punctuate ts: {} count: {}", timestamp, values.size());
context().commit();
}
}
处理器供应商
public class BPS2 implements ProcessorSupplier<String, String> {
private static final Logger LOGGER = LoggerFactory.getLogger(BPS2.class);
@Override
public Processor<String, String> get() {
try {
return new BP2(30000);
} catch(Exception exception) {
LOGGER.error("Unable to instantiate BatchProcessor()", exception);
throw new RuntimeException();
}
}
}
编辑:
为了确保我的调试器不会减慢速度,我构建了它,并在kafka进程的同一个框中运行它。这一次,它甚至没有尝试延迟4分钟或更长时间——在几秒钟内,它向用户输出虚假呼叫 .punctuate()
. 其中许多(大多数)都没有任何干预电话 .process()
.
3条答案
按热度按时间rjjhvcjd1#
好吧-我想这是Kafka的一个错误。
原因如下:
在我最初的测试中,我使用一台机器来运行生产者和消费者。我会运行producer几分钟来生成一些测试数据,然后运行我的测试。这将提供我最初发布的奇怪输出。
然后我决定把制作人推到后台,让它继续运行。现在我看到100%完美的30秒通话间隔
.punctuate()
. 没有问题了。换句话说,如果kafka服务器没有处理任何入站数据,那么它似乎与运行kstreams进程不一致。
pieyvz9o2#
刚看完这个问题的答案,我想也回答了你的问题。其要点是:
streams使用者执行记录轮询
所有返回的记录都已处理完毕。
然后使用配置的延迟安排标点回调。
要点是,标点符号不是固定的时间间隔事件,而#2所用时间的变化将导致标点符号的执行周期发生相同的变化。
……但读了那个链接,他说得比我好。
l3zydbqr3#
更新:这部分答案适用于Kafka0.11或更低版本(Kafka1.0及更高版本见下文)
在Kafka流中,标点符号基于流时间和非系统时间(又名处理时间)。
每个默认流时间是事件时间,即嵌入在kafka记录本身中的时间戳。因为您没有设置非默认值
TimestampExtractor
(见timestamp.extractor
在http://docs.confluent.io/current/streams/developer-guide.html#optional-配置参数),调用punctuate
仅取决于事件时间的处理过程与您处理的记录有关。因此,如果需要几分钟来处理“30秒”(事件时间)的记录,punctuate
呼叫频率将低于30秒(挂钟时间)。。。这也可以解释你不规则的通话模式(即突发和长时间延迟)。如果您的数据事件时间确实“跳跃”,并且要处理的数据在您的主题中已经完全可用,那么kafka streams也会“跳跃”到内部维护的流时间。
我想,你可以通过使用
WallclockTimestampExtractor
(见http://docs.confluent.io/current/streams/developer-guide.html#timestamp-提取器)还有一件事要提:流时间只有在处理数据时才会提前——如果应用程序到达输入主题的末尾并等待数据,
punctuate
不会被调用。即使您使用WallclockTimestampExtractor
.顺便说一句:目前正在讨论流的标点符号行为:https://github.com/apache/kafka/pull/1689
Kafka1.0及更高版本的答案
自Kafka1.0以来,可以根据挂钟时间或事件时间注册标点:https://kafka.apache.org/10/documentation/streams/developer-guide/processor-api.html#id2