我用这个作为使用spring集成读取文件的例子,它工作得很好,但是当我尝试将文件发送给kafka producer时,它不工作。我试图在网上查这个问题,但找不到帮助。这是我的密码:
文件:messageprocessingintegrationflow.java:
@Bean
public IntegrationFlow writeToFile() {
return IntegrationFlows.from(ApplicationConfiguration.INBOUND_CHANNEL)
.transform(m -> new StringBuilder((String)m).toString().toUpperCase())
// .handle(fileWritingMessageHandler)
.handle(loggingHandler())
.handle(kafkaProducerMessageHandler())
.get();
}
//producing channel
@Bean(name="kafkaChannel")
public DirectChannel kafkaChannel() {
return new DirectChannel();
}
@Bean
public DirectChannel consumingChannel() {
return new DirectChannel();
}
@Bean
@ServiceActivator(inputChannel = "kafkaChannel")
public MessageHandler kafkaProducerMessageHandler() {
KafkaProducerMessageHandler<String, String> handler =
new KafkaProducerMessageHandler<>(kafkaTemplate());
handler.setTopicExpression(new LiteralExpression(kafkaTopic));
handler.setMessageKeyExpression(new LiteralExpression("kafka-integration"));
return handler;
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> properties = new HashMap<>();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// introduce a delay on the send to allow more messages to accumulate
properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
return properties;
}
//consumer configuration....
@Bean
public KafkaMessageDrivenChannelAdapter<String, String> kafkaMessageDrivenChannelAdapter() {
KafkaMessageDrivenChannelAdapter<String, String> kafkaMessageDrivenChannelAdapter =
new KafkaMessageDrivenChannelAdapter<>(kafkaListenerContainer());
kafkaMessageDrivenChannelAdapter.setOutputChannel(consumingChannel());
return kafkaMessageDrivenChannelAdapter;
}
@SuppressWarnings("unchecked")
@Bean
public ConcurrentMessageListenerContainer<String, String> kafkaListenerContainer() {
ContainerProperties containerProps = new ContainerProperties(kafkaTopic); //set topic name
return (ConcurrentMessageListenerContainer<String, String>) new ConcurrentMessageListenerContainer<>(
consumerFactory(), containerProps);
}
@Bean
public ConsumerFactory<?, ?> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> properties = new HashMap<>();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "helloworld");
// automatically reset the offset to the earliest offset
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return properties;
}
这是stacktrace:
org.springframework.context.ApplicationContextException: Failed to start bean 'kafkaListenerContainer'; nested exception is java.lang.IllegalArgumentException: A org.springframework.kafka.listener.KafkaDataListener implementation must be provided
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:178) ~[spring-context-4.3.12.RELEASE.jar:4.3.12.RELEASE]
at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:50) ~[spring-context-4.3.12.RELEASE.jar:4.3.12.RELEASE]
at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:348) ~[spring-context-4.3.12.RELEASE.jar:4.3.12.RELEASE]
at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:151) ~[spring-context-4.3.12.RELEASE.jar:4.3.12.RELEASE]
at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:114) ~[spring-context-4.3.12.RELEASE.jar:4.3.12.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:880) ~[spring-context-4.3.12.RELEASE.jar:4.3.12.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:546) ~[spring-context-4.3.12.RELEASE.jar:4.3.12.RELEASE]
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:693) [spring-boot-1.5.8.RELEASE.jar:1.5.8.RELEASE]
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:360) [spring-boot-1.5.8.RELEASE.jar:1.5.8.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:303) [spring-boot-1.5.8.RELEASE.jar:1.5.8.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1118) [spring-boot-1.5.8.RELEASE.jar:1.5.8.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1107) [spring-boot-1.5.8.RELEASE.jar:1.5.8.RELEASE]
at com.porterhead.Application.main(Application.java:25) [classes/:na]
Caused by: java.lang.IllegalArgumentException: A org.springframework.kafka.listener.KafkaDataListener implementation must be provided
at org.springframework.util.Assert.isTrue(Assert.java:92) ~[spring-core-4.3.12.RELEASE.jar:4.3.12.RELEASE]
at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:199) ~[spring-kafka-1.2.2.RELEASE.jar:na]
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:175) ~[spring-context-4.3.12.RELEASE.jar:4.3.12.RELEASE]
... 12 common frames omitted
我不知道我做错了什么。如果你需要更多的细节,请告诉我。谢谢。
1条答案
按热度按时间dfty9e191#
这个
KafkaProducerMessageHandler
是单向组件,不产生应答。它只是发布到Kafka的主题,什么也不做。因此,你不能像对待你的孩子那样,继续追求它handle(loggingHandler())
. 这个KafkaProducerMessageHandler
必须是流中的最后一个端点。不同于FileWritingMessageHandler
这是一个AbstractReplyProducingMessageHandler
继续流动。然而,考虑在未来适当地描述问题:什么是预期的和什么是错误的。答案是我最好的猜测,因为我知道所有这些组件的代码。