有人能帮我设置一下吗 KafkaHeaders.MESSAGE_KEY
下面的代码,以便我可以发布到不同的分区有效负载。
@Bean
public IntegrationFlow fileInboundChannelFlow() {
FileInboundChannelAdapterSpec messageSourceSpec = Files
.inboundAdapter(Paths.get(this.properties.getDirectory()).toFile());
messageSourceSpec = messageSourceSpec.filter(getFilter());
//messageSourceSpec.regexFilter(this.properties.getFilenameRegex());
messageSourceSpec.preventDuplicates(this.properties.isPreventDuplicates());
IntegrationFlowBuilder flowBuilder = IntegrationFlows.from(messageSourceSpec)
.split(new FileSplitter(true, true))
.enrichHeaders(h -> h.header(KafkaHeaders.MESSAGE_KEY, "payload.flightNumber"));
return flowBuilder.<Object, Class<?>>route(Object::getClass,
m -> m.channelMapping(FileSplitter.FileMarker.class, "markers.input").channelMapping(String.class,
"lines.input"))
.get();
}
我想根据payload.prod\u cd设置密钥。因为是单例bean,所以我想为每个负载初始化不同的kafka头键。
1条答案
按热度按时间amrnrhlw1#
你说的对:
你只需要使用
headerExpression()
取而代之的是: