Apache camel kafka在生成消息之前聚合,但丢失了标题

4ioopgfo  于 2022-11-07  发布在  Apache
关注(0)|答案(1)|浏览(189)

我用Apache驼色Kafka配春 Boot
<camel.version>3.14.2</camel.version>
我在apache camel Kafka组件上使用了默认配置

<dependency>
        <groupId>org.apache.camel.springboot</groupId>
        <artifactId>camel-kafka-starter</artifactId>
        <version>${camel.version}</version>
    </dependency>

我的路线camel-file消耗有6000行

from(fileConsume).split(body().tokenize()).setHeader("testHeader", "valueHeader").aggregate(new GroupedMessageAggregationStrategy())
            .constant(true).completionTimeout(100L).to("kafka:topicTest");

所有消息从文件产生Kafka非常快(少于2秒),但头不存在。
当我移除骨料时

from(fileConsume).split(body().tokenize()).setHeader("testHeader", "valueHeader").to("kafka:topicTest");

Kafka上生成的文件中的所有消息都非常低(超过10分钟),但标题存在。

我需要一些帮助来生成消息与apache Camel Kafka组件的速度与标题的方式。

ndasle7k

ndasle7k1#

you must do this, in order to keep header when aggregation is doing.

from("sftp://xxxxxxx@localhost:"
        + "2222/data/in"
        + "?password="
        + "&preferredAuthentications=publickey"
        + "&knownHostsFile=~/.ssh/known_hosts"
        + "&privateKeyFile=xxxxxxx"
        + "&privateKeyPassphrase="
        + "&passiveMode=true"
        + "&fastExistsCheck=true"
        + "&download=true"
        + "&delete=true"
        + "&stepwise=false"
        + "&antInclude=*"
        + "&antExclude=**reject**"
        + "&recursive=false"
        + "&maxMessagesPerPoll=10"
        + "&initialDelay=0"
        + "&delay=0"
        + "&connectTimeout=10000"
        + "&soTimeout=300000"
        + "&timeout=30000"
        + "&shuffle=true"
        + "&eagerMaxMessagesPerPoll=false"
        + "&moveFailed=reject"
        + "&binary=true"
        + "&localWorkDirectory=/opt/camel_data/kafka/"
        + "&readLock=none"
        + "&readLockCheckInterval=1000"
        + "&readLockMinLength=1"
        + "&readLockLoggingLevel=INFO"
        + "&readLockIdempotentReleaseDelay=10000"
        + "&readLockRemoveOnCommit=false"
        + "&readLockRemoveOnRollback=true"
        + "&bulkRequests=1000"
        + "&charset=utf-8")
        .routeId("Consume SFTP")
        .id("Consume SFTP")
        .setProperty("yoda_core_technical_id").header(Exchange.BREADCRUMB_ID)
        .setProperty("x_filename_source").header(Exchange.FILE_NAME_ONLY)
        .setProperty("x_filepath_source").header("CamelFileAbsolutePath")
        .setProperty("x_correlation_id").header("CamelFileName")
        .split(body().tokenize())
            .setHeader("test",constant("test"))
        .aggregate(new GroupedMessageAggregationStrategy())
            .constant(true)
            .completionTimeout(100L)
        .to("direct:aggregate");

from("direct:aggregate")
.process(new Processor() {
            @Override
            public void process(Exchange exchange) throws Exception {
                System.out.println(exchange);
                GenericFileMessage<String> message =(GenericFileMessage<String>) exchange.getMessage().getBody(List.class).get(0);
                exchange.getMessage().setHeader("test",
                        message.getHeader("test"));
            }
        })
        .to("mock:result");

相关问题