我试图写一个应用程序,将与Kafka集成使用 Camel (版本-3.4.2)
我从这个问题的答案中借用了一种方法。
我有一个监听Kafka主题信息的路由。通过使用一个简单的执行器,将此消息的处理与消耗解耦。每个处理都作为一个任务提交给这个执行者。消息的顺序并不重要,唯一需要考虑的因素是消息处理的速度和效率。我已经禁用了自动提交,并在任务提交给执行者后手动提交消息。丢失当前正在处理的消息(由于崩溃/关闭)是可以的,但是kafka中从未提交处理的消息不应该丢失(由于提交了偏移量)。现在来回答问题,
如何有效处理负载?例如,有1000条消息,但我一次只能并行处理100条。
现在我的解决方案是阻止消费者轮询线程并尝试连续提交作业。但暂停投票会是一个更好的方法,但我找不到任何方法来实现这一点在 Camel 。
有没有更好的方法(驼峰方法)将处理与消耗解耦并处理背压?
public static void main(String[] args) throws Exception {
String consumerId = System.getProperty("consumerId", "1");
ExecutorService executor = new ThreadPoolExecutor(100, 100, 0L, TimeUnit.MILLISECONDS,
new SynchronousQueue<>());
LOGGER.info("Consumer {} starting....", consumerId);
Main main = new Main();
main.init();
CamelContext context = main.getCamelContext();
ComponentsBuilderFactory.kafka().brokers("localhost:9092").metadataMaxAgeMs(120000).groupId("consumer")
.autoOffsetReset("earliest").autoCommitEnable(false).allowManualCommit(true).maxPollRecords(100)
.register(context, "kafka");
ConsumerBean bean = new ConsumerBean();
context.addRoutes(new RouteBuilder() {
@Override
public void configure() {
from("kafka:test").process(exchange -> {
LOGGER.info("Consumer {} - Exhange is {}", consumerId, exchange.getIn().getHeaders());
processTask(exchange);
commitOffset(exchange);
});
}
private void processTask(Exchange exchange) throws InterruptedException {
try {
executor.submit(() -> bean.execute(exchange.getIn().getBody(String.class)));
} catch (Exception e) {
LOGGER.error("Exception occured {}", e.getMessage());
Thread.sleep(1000);
processTask(exchange);
}
}
private void commitOffset(Exchange exchange) {
boolean lastOne = exchange.getIn().getHeader(KafkaConstants.LAST_RECORD_BEFORE_COMMIT, Boolean.class);
if (lastOne) {
KafkaManualCommit manual = exchange.getIn().getHeader(KafkaConstants.MANUAL_COMMIT,
KafkaManualCommit.class);
if (manual != null) {
LOGGER.info("manually committing the offset for batch");
manual.commitSync();
}
} else {
LOGGER.info("NOT time to commit the offset yet");
}
}
});
main.run();
}
1条答案
按热度按时间e5nqia271#
你可以用
throttle
用于此目的的eip。请看一下这里的原始文档。