如何在spring集成中设置kafkaheaders.message\u键

oogrdqng  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(274)

有人能帮我设置一下吗 KafkaHeaders.MESSAGE_KEY 下面的代码,以便我可以发布到不同的分区有效负载。

@Bean
public IntegrationFlow fileInboundChannelFlow() {
    FileInboundChannelAdapterSpec messageSourceSpec = Files
            .inboundAdapter(Paths.get(this.properties.getDirectory()).toFile());

    messageSourceSpec = messageSourceSpec.filter(getFilter());
    //messageSourceSpec.regexFilter(this.properties.getFilenameRegex());
    messageSourceSpec.preventDuplicates(this.properties.isPreventDuplicates());

    IntegrationFlowBuilder flowBuilder = IntegrationFlows.from(messageSourceSpec)
            .split(new FileSplitter(true, true))
            .enrichHeaders(h -> h.header(KafkaHeaders.MESSAGE_KEY, "payload.flightNumber"));

    return flowBuilder.<Object, Class<?>>route(Object::getClass,
            m -> m.channelMapping(FileSplitter.FileMarker.class, "markers.input").channelMapping(String.class,
                    "lines.input"))
            .get();
}

我想根据payload.prod\u cd设置密钥。因为是单例bean,所以我想为每个负载初始化不同的kafka头键。

amrnrhlw

amrnrhlw1#

你说的对:

.enrichHeaders(h -> h.header(KafkaHeaders.MESSAGE_KEY, "payload.flightNumber"));

你只需要使用 headerExpression() 取而代之的是:

/**
 * Add a single header specification where the value is a String representation of a
 * SpEL {@link Expression}. If the header exists, it will <b>not</b> be overwritten
 * unless {@link #defaultOverwrite(boolean)} is true.
 * @param name the header name.
 * @param expression the expression.
 * @return the header enricher spec.
 */
public HeaderEnricherSpec headerExpression(String name, String expression) {

相关问题