使用ApacheCamel和kafka时如何处理背压?

esbemjvw  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(464)

我试图写一个应用程序,将与Kafka集成使用 Camel (版本-3.4.2)
我从这个问题的答案中借用了一种方法。
我有一个监听Kafka主题信息的路由。通过使用一个简单的执行器,将此消息的处理与消耗解耦。每个处理都作为一个任务提交给这个执行者。消息的顺序并不重要,唯一需要考虑的因素是消息处理的速度和效率。我已经禁用了自动提交,并在任务提交给执行者后手动提交消息。丢失当前正在处理的消息(由于崩溃/关闭)是可以的,但是kafka中从未提交处理的消息不应该丢失(由于提交了偏移量)。现在来回答问题,
如何有效处理负载?例如,有1000条消息,但我一次只能并行处理100条。
现在我的解决方案是阻止消费者轮询线程并尝试连续提交作业。但暂停投票会是一个更好的方法,但我找不到任何方法来实现这一点在 Camel 。
有没有更好的方法(驼峰方法)将处理与消耗解耦并处理背压?

public static void main(String[] args) throws Exception {
        String consumerId = System.getProperty("consumerId", "1");
        ExecutorService executor = new ThreadPoolExecutor(100, 100, 0L, TimeUnit.MILLISECONDS,
                new SynchronousQueue<>());
        LOGGER.info("Consumer {} starting....", consumerId);

        Main main = new Main();
        main.init();

        CamelContext context = main.getCamelContext();
        ComponentsBuilderFactory.kafka().brokers("localhost:9092").metadataMaxAgeMs(120000).groupId("consumer")
                .autoOffsetReset("earliest").autoCommitEnable(false).allowManualCommit(true).maxPollRecords(100)
                .register(context, "kafka");

        ConsumerBean bean = new ConsumerBean();
        context.addRoutes(new RouteBuilder() {
            @Override
            public void configure() {
                from("kafka:test").process(exchange -> {
                    LOGGER.info("Consumer {} - Exhange is {}", consumerId, exchange.getIn().getHeaders());
                    processTask(exchange);
                    commitOffset(exchange);
                });
            }

            private void processTask(Exchange exchange) throws InterruptedException {
                try {
                    executor.submit(() -> bean.execute(exchange.getIn().getBody(String.class)));
                } catch (Exception e) {
                    LOGGER.error("Exception occured {}", e.getMessage());
                    Thread.sleep(1000);
                    processTask(exchange);
                }
            }

            private void commitOffset(Exchange exchange) {
                boolean lastOne = exchange.getIn().getHeader(KafkaConstants.LAST_RECORD_BEFORE_COMMIT, Boolean.class);
                if (lastOne) {
                    KafkaManualCommit manual = exchange.getIn().getHeader(KafkaConstants.MANUAL_COMMIT,
                            KafkaManualCommit.class);
                    if (manual != null) {
                        LOGGER.info("manually committing the offset for batch");
                        manual.commitSync();
                    }
                } else {
                    LOGGER.info("NOT time to commit the offset yet");
                }
            }
        });

        main.run();
    }
e5nqia27

e5nqia271#

你可以用 throttle 用于此目的的eip。

from("your uri here")
.throttle(maxRequestCount)
.timePeriodMillis(inTimePeriodMs)
.to(yourProcessorUri)
.end()

请看一下这里的原始文档。

相关问题