spring kafka异步发送调用块

tnkciper  于 2021-06-08  发布在  Kafka
关注(0)|答案(5)|浏览(571)

我使用的是springkafka版本1.2.1,当kafka服务器关闭/无法访问时,asynchronoussend calls会阻塞一段时间。似乎是tcp超时。代码如下:

ListenableFuture<SendResult<K, V>> future = kafkaTemplate.send(topic, key, message);
future.addCallback(new ListenableFutureCallback<SendResult<K, V>>() {
    @Override
    public void onSuccess(SendResult<K, V> result) {
        ...
    }

    @Override
    public void onFailure(Throwable ex) {
        ...
    }
});

我快速地看了一下SpringKafka代码,它似乎只是将任务传递给kafka客户机库,将回调交互转换为未来的对象交互。看看kafka客户机库,代码变得更加复杂,我没有花时间去理解它,但我猜它可能在同一个线程中进行远程调用(至少是元数据?)。
作为一个用户,我期望返回未来的spring-kafka方法能够立即返回,即使远程kafka服务器无法访问。
任何确认,如果我的理解是错误的,或者这是一个错误将是欢迎的。现在我把它变成了异步的。
另一个问题是,SpringKafka文档一开始就说它提供了同步和异步发送方法。我找不到任何不返回未来的方法,可能文档需要更新。
如果需要的话,我很乐意提供更多的细节。谢谢。

zfciruhq

zfciruhq1#

下面的代码用于异步获取响应

ProducerRecord<UUID, Person> person = new ProducerRecord<>(kafkaTemplate.getDefaultTopic(), messageKey,Person);
    Runnable runnable = () -> kafkaTemplate.send(person).addCallback(new MessageAckHandler());
    new Thread(runnable).start();

  public class MessageAckHandler implements ListenableFutureCallback<SendResult<UUID,Person>> {

    @Override
    public void onFailure(Throwable exception) {
      log.error("unable to send message: " + exception.getMessage());
     }

     @Override
     public void onSuccess(SendResult<UUID, ScreeningEvent> result) {
       log.debug("sent message with offset={} messageID={}", result.getRecordMetadata().offset(), result.getProducerRecord().key());
     }
  }

   public class SendResult<K, V> {

     private final ProducerRecord<K, V> producerRecord;

     private final RecordMetadata recordMetadata;

     public SendResult(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata) {
        this.producerRecord = producerRecord;
        this.recordMetadata = recordMetadata;
    }

    public ProducerRecord<K, V> getProducerRecord() {
       return this.producerRecord;
    }

    public RecordMetadata getRecordMetadata() {
       return this.recordMetadata;
    }

    @Override
    public String toString() {
       return "SendResult [producerRecord=" + this.producerRecord + ", recordMetadata=" + this.recordMetadata + "]";
    }

 }
7xzttuei

7xzttuei2#

除了配置类上的@enableasync注解外,调用此代码时还需要在方法上使用@async注解。
http://www.baeldung.com/spring-async
这里有一些代码片段。Kafka制作人配置:

@EnableAsync
@Configuration
public class KafkaProducerConfig {

    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducerConfig.class);

    @Value("${kafka.brokers}")
    private String servers;

    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        return props;
    }

    @Bean
    public ProducerFactory<String, GenericMessage> producerFactory(ObjectMapper objectMapper) {
        return new DefaultKafkaProducerFactory<>(producerConfigs(), new StringSerializer(), new JsonSerializer(objectMapper));
    }

    @Bean
    public KafkaTemplate<String, GenericMessage> kafkaTemplate(ObjectMapper objectMapper) {
        return new KafkaTemplate<String, GenericMessage>(producerFactory(objectMapper));
    }

    @Bean
    public Producer producer() {
        return new Producer();
    }
}

以及制作人本身:

public class Producer {

    public static final Logger LOGGER = LoggerFactory.getLogger(Producer.class);

    @Autowired
    private KafkaTemplate<String, GenericMessage> kafkaTemplate;

    @Async
    public void send(String topic, GenericMessage message) {
        ListenableFuture<SendResult<String, GenericMessage>> future = kafkaTemplate.send(topic, message);
        future.addCallback(new ListenableFutureCallback<SendResult<String, GenericMessage>>() {

            @Override
            public void onSuccess(final SendResult<String, GenericMessage> message) {
                LOGGER.info("sent message= " + message + " with offset= " + message.getRecordMetadata().offset());
            }

            @Override
            public void onFailure(final Throwable throwable) {
                LOGGER.error("unable to send message= " + message, throwable);
            }
        });
    }
}
jmp7cifd

jmp7cifd3#

只是想确定一下。是否应用了@enableasync注解?我想说的是,这可能是确定未来行为的关键

hts6caw3

hts6caw34#

如果我看看Kafka制作人本身,传达信息有两个部分:
将消息存储到内部缓冲区中。
从缓冲区上传信息到Kafka。
kafkaproducer对于第二部分是异步的,而不是第一部分。
send()方法仍然可以在第一部分被阻止,并最终引发timeoutexceptions,例如:
主题的元数据没有缓存或过时,因此生产者尝试从服务器获取元数据,以了解主题是否仍然存在以及它有多少个分区。
缓冲区已满(默认为32mb)。
如果服务器完全没有响应,您可能会遇到这两个问题。

更新:

我在Kafka2.2.1中测试并确认了这一点。在2.4和/或2.6中,这种行为似乎有所不同:kafka-3720

f3temu5u

f3temu5u5#

最好的解决方案是在生产者级别添加一个“回调”侦听器。

@Bean
public KafkaTemplate<String, WebUserOperation> operationKafkaTemplate() {
    KafkaTemplate<String, WebUserOperation> kt = new KafkaTemplate<>(operationProducerFactory());
    kt.setProducerListener(new ProducerListener<String, WebUserOperation>() {

        @Override
        public void onSuccess(ProducerRecord<String, WebUserOperation> record, RecordMetadata recordMetadata) {
            System.out.println("### Callback :: " + recordMetadata.topic() + " ; partition = " 
                    + recordMetadata.partition()  +" with offset= " + recordMetadata.offset()
                    + " ; Timestamp : " + recordMetadata.timestamp() + " ; Message Size = " + recordMetadata.serializedValueSize());
        }

        @Override
        public void onError(ProducerRecord<String, WebUserOperation> producerRecord, Exception exception) {
            System.out.println("### Topic = " + producerRecord.topic() + " ; Message = " + producerRecord.value().getOperation());
            exception.printStackTrace();
        }
    });
    return kt;
}

相关问题