如何将spring集成频道调度到kafka?

kq0g1dla  于 2021-06-07  发布在  Kafka
关注(0)|答案(0)|浏览(205)

我是Kafka的新手,尝试创建一个spring启动应用程序,接受rest请求并将其发布到Kafka队列。我已经发布了代码片段。请求被接收,然后发送到 testlogchannel . 我预期基于配置,我有一个json将发布到kafka主题。我有一个进程使用命令来消耗消息 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic testlog --from-beginning . 我注意到,这些消息从来没有进入kafka主题,而是以类型为的测试活页夹结束 TestSupportBinder . 令人惊讶的是,Kafka活页夹从来没有使用,即使我在配置设置 application.yml 以及 pom.xml . 我用 Java / Annotation 基于配置,不使用xml。
有人能指出我遗漏了什么吗?我怎样才能有通道发送消息到Kafka和使用Kafka活页夹?
我的application.yml代码段如下

spring:
    cloud:
        stream:
            default-binder: kafka
            kafka:
                binder:
                    brokers: localhost
                    zk-nodes: localhost
                bindings:
                    testlogchannel:
                        destination: testlog

制片人频道类别:

public interface TestLogProducerChannel{

    @Output("testlogchannel")
    MessageChannel testLogWrite();
}

消息发送组件:

@Component
public class MessageProducerUtil {

    private static final Logger LOG = LoggerFactory.getLogger(MessageProducerUtil.class);

    private final MessageChannel consumer;

    public MessageProducerUtil(TestLogProducerChannel channel) {
        this.consumer = channel.testLogWrite();
    }

    public void sendMessage(TestLog log) {
        Message<TestLog> msg = MessageBuilder.withPayload(log).build();
        LOG.debug("Sending log object" + log);
        this.consumer.send(msg);
    }

}

我的maven依赖项如下所示:

<dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-autoconfigure</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-configuration-processor</artifactId>
        <optional>true</optional>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-loader-tools</artifactId>
    </dependency>
    <!-- Kafka support -->
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-stream</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-stream-binder-kafka</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-stream-test-support</artifactId>
        <scope>test</scope>
    </dependency>

我的休息控制器:

@RestController
@RequestMapping("/api/app/v1.0")
public class TestLogResource {
    private static final Logger LOGGER = LoggerFactory.getLogger(TestLogResource.class);

    @Autowired
    private MessageProducerUtil producer;
    @PostMapping("testlog")
    public ResponseEntity<TestLog> postTestLog(@RequestBody(required = true) TestLog testLog) {
        producer.sendMessage(testLog);
        return new ResponseEntity<>(testLog, HttpStatus.OK);
    }
}

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题