无法使用apache camel使用kafka消息

ijnw1ujt  于 2021-06-06  发布在  Kafka
关注(0)|答案(0)|浏览(249)

我无法使用apache camel在本地模式下使用kafka消息。当应用程序启动时,我没有查看任何kafka消费者属性。下面是我的Kafka消费代码

import org.apache.camel.CamelContext;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.properties.PropertiesComponent;
import org.apache.camel.impl.DefaultCamelContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class MessageConsumerClient {

    private static final Logger LOG = LoggerFactory.getLogger(MessageConsumerClient.class);

    private MessageConsumerClient() {
    }

    public static void main(String[] args) throws Exception {

        LOG.info("About to run Kafka-camel integration...");

        CamelContext camelContext = new DefaultCamelContext();

        // Add route to send messages to Kafka

        camelContext.addRoutes(new RouteBuilder() {
            public void configure() {
                PropertiesComponent pc = getContext().getComponent("properties", PropertiesComponent.class);
                pc.setLocation("classpath:application.properties");

                log.info("About to start route: Kafka Server -> Log ");

                from("kafka:{{consumer.topic}}?brokers={{kafka.host}}:{{kafka.port}}"
                        + "&maxPollRecords={{consumer.maxPollRecords}}" + "&consumersCount={{consumer.consumersCount}}"
                        + "&seekTo={{consumer.seekTo}}" + "&groupId={{consumer.group}}").routeId("FromKafka")
                                .log("${body}");
            }
        });
        camelContext.start();

        Thread.sleep(5 * 60 * 1000);

        camelContext.stop();
    }

}

我在pom.xml中使用以下依赖项:

  1. Camel 芯-2.21.1 2. Camel Kafka-2.21.1

pom.xml文件

<dependencies>
        <dependency>
            <groupId>org.apache.camel</groupId>
            <artifactId>camel-core</artifactId>
            <version>2.21.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.camel</groupId>
            <artifactId>camel-kafka</artifactId>
            <version>2.21.1</version>
        </dependency>
    </dependencies>

任何帮助将不胜感激,因为我无法查看任何日志时执行。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题