我是Kafka的新手,尝试创建一个spring启动应用程序,接受rest请求并将其发布到Kafka队列。我已经发布了代码片段。请求被接收,然后发送到 testlogchannel
. 我预期基于配置,我有一个json将发布到kafka主题。我有一个进程使用命令来消耗消息 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic testlog --from-beginning
. 我注意到,这些消息从来没有进入kafka主题,而是以类型为的测试活页夹结束 TestSupportBinder
. 令人惊讶的是,Kafka活页夹从来没有使用,即使我在配置设置 application.yml
以及 pom.xml
. 我用 Java
/ Annotation
基于配置,不使用xml。
有人能指出我遗漏了什么吗?我怎样才能有通道发送消息到Kafka和使用Kafka活页夹?
我的application.yml代码段如下
spring:
cloud:
stream:
default-binder: kafka
kafka:
binder:
brokers: localhost
zk-nodes: localhost
bindings:
testlogchannel:
destination: testlog
制片人频道类别:
public interface TestLogProducerChannel{
@Output("testlogchannel")
MessageChannel testLogWrite();
}
消息发送组件:
@Component
public class MessageProducerUtil {
private static final Logger LOG = LoggerFactory.getLogger(MessageProducerUtil.class);
private final MessageChannel consumer;
public MessageProducerUtil(TestLogProducerChannel channel) {
this.consumer = channel.testLogWrite();
}
public void sendMessage(TestLog log) {
Message<TestLog> msg = MessageBuilder.withPayload(log).build();
LOG.debug("Sending log object" + log);
this.consumer.send(msg);
}
}
我的maven依赖项如下所示:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-autoconfigure</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-loader-tools</artifactId>
</dependency>
<!-- Kafka support -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-test-support</artifactId>
<scope>test</scope>
</dependency>
我的休息控制器:
@RestController
@RequestMapping("/api/app/v1.0")
public class TestLogResource {
private static final Logger LOGGER = LoggerFactory.getLogger(TestLogResource.class);
@Autowired
private MessageProducerUtil producer;
@PostMapping("testlog")
public ResponseEntity<TestLog> postTestLog(@RequestBody(required = true) TestLog testLog) {
producer.sendMessage(testLog);
return new ResponseEntity<>(testLog, HttpStatus.OK);
}
}
暂无答案!
目前还没有任何答案,快来回答吧!