@streamlistener未收到来自kafka主题的消息

sirbozc5  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(779)

我可以使用以下代码发送和接收信息:

@EnableBinding(Processor.class)
public class KafkaStreamsConfiguration {
  @StreamListener(Processor.INPUT)
  @SendTo(Processor.OUTPUT)
  public String processMessage(String message) {
    System.out.println("message = " + message);
    return message.replaceAll("my", "your");
  }
}

@RunWith(SpringRunner.class)
@SpringBootTest
@DirtiesContext
public class StreamApplicationIT {
private static String topicToPublish = "eventUpdateFromEventModel";

@BeforeClass
public static void setup() {
    System.setProperty("spring.kafka.bootstrap-servers", embeddedKafka.getBrokersAsString());
}

@Autowired
private KafkaMessageSender<String> kafkaMessageSenderToTestErrors;

@Autowired
private KafkaMessageSender<EventNotificationDto> kafkaMessageSender;

@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, topicToPublish);

@Autowired
private Processor pipe;

@Autowired
private MessageCollector messageCollector;

@Rule
public OutputCapture outputCapture = new OutputCapture();

@Test
public void working() {
    pipe.input()
            .send(MessageBuilder.withPayload("This is my message")
                    .build());

    Object payload = messageCollector.forChannel(pipe.output())
            .poll()
            .getPayload();

    assertEquals("This is your message", payload.toString());
}

@Test
public void non_working() {
    kafkaMessageSenderToTestErrors.send(topicToPublish, "This was my message");
    assertTrue(isMessageReceived("This was your message", 50));
}

private boolean isMessageReceived(final String msg, final int maxAttempt) {
    return IntStream.rangeClosed(0, maxAttempt)
            .peek(a -> {
                try {
                    TimeUnit.MILLISECONDS.sleep(100);
                } catch (InterruptedException e) {
                    fail();
                }
            }).anyMatch(i -> outputCapture.toString().contains(msg));
}

}

@Service
@Slf4j
public class KafkaMessageSender<T> {
    private final KafkaTemplate<String, byte[]> kafkaTemplate;
    private final ObjectWriter objectWriter;

    public KafkaMessageSender(KafkaTemplate<String, byte[]> kafkaTemplate, ObjectMapper objectMapper) {
        this.kafkaTemplate = kafkaTemplate;
        this.objectWriter = objectMapper.writer();
    }

    public void send(String topicName, T payload) {
        try {
            kafkaTemplate.send(topicName, objectWriter.writeValueAsString(payload).getBytes());
        } catch (JsonProcessingException e) {
            log.info("error converting object into byte array {}", payload.toString().substring(0, 50));
        }
        log.info("sent payload to topic='{}'", topicName);
    }
}

但是,当我使用kafkatemplate向任何主题发送消息时,streamlistener都不会收到消息。

spring.cloud.stream.bindings.input.group=test
spring.cloud.stream.bindings.input.destination=eventUpdateFromEventModel

我的pom.xml:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-test-support</artifactId>
    <scope>test</scope>
</dependency>

    <!-- Spring boot version -->
    <spring.boot.version>1.5.7.RELEASE</spring.boot.version>
    <spring-cloud.version>Edgware.SR3</spring-cloud.version>

<dependencyManagement>
      <dependencies>

        <dependency>
            <!-- Import dependency management from Spring Boot -->
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-dependencies</artifactId>
            <version>${spring.boot.version}</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-dependencies</artifactId>
            <version>${spring-cloud.version}</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>

    </dependencies>
</dependencyManagement>
yws3nbqq

yws3nbqq1#

工作

Object payload = messageCollector.forChannel(pipe.output())
        .poll()
        .getPayload();

...
不起作用

KafkaTemplate

这是因为您在测试中使用的是testbinder,而不是真正的kafka代理和kafka binder。
消息收集器只是从通道中获取它。如果您想使用真正的kafka代理进行测试,请参阅testembedded kafka示例应用程序。
编辑
我刚刚测试了示例的ditmars(boot1.5.x)版本,它运行良好。。。

相关问题