我正在尝试使用 <KStream>.process()
用一个 TimeWindows.of("name", 30000)
将一些ktable值进行批处理并发送。似乎30秒超过了使用者超时时间间隔,在此时间间隔之后,kafka认为所述使用者已失效并释放分区。
我尝试过提高轮询和提交间隔的频率以避免这种情况:
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "5000");
config.put(StreamsConfig.POLL_MS_CONFIG, "5000");
不幸的是,这些错误仍在发生:
(很多)
ERROR o.a.k.s.p.internals.RecordCollector - Error sending record to topic kafka_test1-write_aggregate2-changelog
org.apache.kafka.common.errors.TimeoutException: Batch containing 1 record(s) expired due to timeout while requesting metadata from brokers for kafka_test1-write_aggregate2-changelog-0
接下来是:
INFO o.a.k.c.c.i.AbstractCoordinator - Marking the coordinator 12.34.56.7:9092 (id: 2147483547 rack: null) dead for group kafka_test1
WARN o.a.k.s.p.internals.StreamThread - Failed to commit StreamTask #0_0 in thread [StreamThread-1]:
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:578)
显然,我需要更频繁地将心跳发送回服务器。怎样?
我的拓扑是:
KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> lines = kStreamBuilder.stream(TOPIC);
KTable<Windowed<String>, String> kt = lines.aggregateByKey(
new DBAggregateInit(),
new DBAggregate(),
TimeWindows.of("write_aggregate2", 30000));
DBProcessorSupplier dbProcessorSupplier = new DBProcessorSupplier();
kt.toStream().process(dbProcessorSupplier);
KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, streamsConfig);
kafkaStreams.start();
ktable每30秒按键对值进行分组。在processor.init()中,我调用 context.schedule(30000)
.
dbprocessorsupplier提供了dbprocessor的一个示例。这是abstractprocessor的一个实现,其中提供了所有重写。他们所做的就是记录,这样我就知道他们什么时候被击中了。
这是一个非常简单的拓扑结构,但很明显我在某个地方漏了一步。
编辑:
我知道我可以在服务器端对此进行调整,但我希望有一个客户端解决方案。我喜欢在客户机退出/死亡时分区很快可用的概念。
编辑:
为了简化问题,我从图中删除了聚合步骤。现在只是consumer->processor()(如果我直接把消费者送到 .print()
它工作很快,所以我知道它是好的)(类似地,如果我通过 .print()
看起来也不错)。
我发现 .process()
-应该是哪个电话 .punctuate()
实际上,每30秒阻塞的时间长度是可变的,输出是随机的(如果有的话)。
主程序
调试输出
处理器供应商
处理器
进一步:
我将调试级别设置为“debug”,然后重新运行。我看到很多信息:
DEBUG o.a.k.s.p.internals.StreamTask - Start processing one record [ConsumerRecord <info>
但这是一个断点 .punctuate()
函数没有被击中。所以它做了很多工作,但没有给我一个使用它的机会。
1条答案
按热度按时间wkyowqbh1#
一些澄清:
StreamsConfig.COMMIT_INTERVAL_MS_CONFIG
是提交间隔的下限,即在提交之后,下一次提交不会在此时间之前发生。基本上,kafka stream会在这段时间过后尽快提交,但无法保证执行下一次提交实际需要多长时间。StreamsConfig.POLL_MS_CONFIG
用于内部KafkaConsumer#poll()
调用,以指定poll()
打电话。因此,这两个值都无助于提高心跳频率。
kafka streams在处理记录时遵循“深度优先”策略。这意味着
poll()
对于每个记录,将执行拓扑的所有运算符。假设您有三个连续的Map,那么在处理下一个/第二个记录之前,将为第一个记录调用所有三个Map。因此,下一个
poll()
会打电话的,先录完之后poll()
已经完全处理了。如果你想更频繁地心跳,你需要确保poll()
调用获取的记录更少,因此处理所有记录所需的时间更少,下一个调用所需的时间也更少poll()
将提前触发。您可以将配置参数用于
KafkaConsumer
你可以通过StreamsConfig
完成这项工作(参见https://kafka.apache.org/documentation.html#consumerconfigs):streamconfig.put(consumerconfig.,value);
max.poll.records
:如果减小此值,将轮询较少的记录session.timeout.ms
:如果增加此值,则有更多的时间处理数据(添加此值是为了完整性,因为它实际上是一个客户端设置,而不是服务器/代理端配置--即使您知道此解决方案并且不喜欢它:))编辑
从Kafka开始
0.10.1
可以(并且建议)在streams config中为consumer和procuder配置加前缀。这避免了参数冲突,因为某些参数名称用于使用者和生产者,否则无法区分(并将同时应用于使用者和生产者)。为可以使用的参数添加前缀StreamsConfig#consumerPrefix()
或者StreamsConfig#producerPrefix()
分别是。例如:streamsConfig.put(StreamsConfig.consumerPrefix(ConsumerConfig.PARAMETER), VALUE);
还有一点需要补充:这个问题中描述的场景是一个已知的问题,已经有kip-62引入了KafkaConsumer
发送心跳,从而将心跳与poll()
电话。kafka streams将在即将发布的版本中利用这一新功能。