java—测试SpringCloudStreams服务的问题

yh2wf1be  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(412)

我在测试我的spring云流服务(它写入kafka流)时遇到问题。它的基础是下面的baeldung介绍
这是服务代码(细节省略)

@Service
@EnableBinding(Source.class)
public class KafkaWriterService {

    @SendTo(Source.OUTPUT)
    public String write(String str) {
        return str;
    }

}

这是测试

@SpringBootTest
@RunWith(SpringRunner.class)
public class KafkaWriterServiceTest {

    @Autowired
    private Source source;

    @Autowired
    private MessageCollector collector;

    @Autowired
    private KafkaWriterService service;

    @Test
    public void testMessages() {
        BlockingQueue<Message<?>> messages = collector.forChannel(source.output());
        service.write("FooBar");

        Object payload = messages.poll().getPayload();
        System.out.println(payload);
    }

很直接,但是当执行测试时,我得到一个nullpointerexception,因为poll返回一个null。
知道问题是什么吗?
谢谢!

igsr9ssn

igsr9ssn1#

更新:
如果您只是想找一个Kafka制作人的例子,那么请对您的服务进行以下更改:

@Service
@EnableBinding({Source.class})
public class KafkaWriterService {

    private final MessageChannel output;

    public KafkaWriterService(@Qualifier("output") MessageChannel output) {
        this.output = output;
    }

    public void write(String str) {
        output.send(MessageBuilder.withPayload(str).build());
    }
}
``` `@SendTo` 需要通过的消息源 `@StreamListener` . 它用于转换输入消息。
在测试程序中,没有生成任何消息。
将代码更改为以下内容,以便在测试中生成消息:

@Service
@EnableBinding({Source.class, Sink.class})
public class KafkaWriterService {

@StreamListener(Sink.INPUT)
@SendTo(Source.OUTPUT)
public String write(String str) {
    return "Transformed: " + str;
}

}

@SpringBootTest
@RunWith(SpringRunner.class)
public class KafkaWriterServiceTest {

@Autowired
private Sink sink;

@Autowired
private Source source;

@Autowired
private MessageCollector collector;

@Autowired
private KafkaWriterService service;

@Test
public void testMessages() {
    BlockingQueue<Message<?>> messages = collector.forChannel(source.output());
    sink.input().send(MessageBuilder.withPayload("Hello World!").build());

    Object payload = messages.poll().getPayload();
    System.out.println(payload);
}

}

相关问题