defaultkafkaproducerfactory,在引导服务器关闭时具有transactionidprefix无休止的等待

daolsyd0  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(321)

嗨,
我正在使用springkafka1.3.0.release创建一个事务生产者。当引导服务器关闭时,defaultkafkaproducerfactory会无休止地等待,直到引导服务器启动。
我做错什么了?我可以设置超时和/或其他类似属性吗?
这是我的代码的一个示例,用于重现场景:

public static void main(String[] args) {

    final DefaultKafkaProducerFactory<Object, Object> producerFactory = new DefaultKafkaProducerFactory<>(producerConfigs());    

    producerFactory.setTransactionIdPrefix("transactionIdPrefix");

    final Producer<Object, Object> producer = producerFactory.createProducer();       

    System.out.println("Created producer:" + producer);  
}

private static Map<String, Object> producerConfigs() {

    final Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.56.1:9092");
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

    props.put(ProducerConfig.RETRIES_CONFIG, 1);
    props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);

    props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 1000);    
    props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 1000);
    props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 1000);

   return props;
}
gxwragnw

gxwragnw1#

是工厂打电话引起的 initTransactions() 例如,如果没有足够的代理来支持事务日志复制因子,则在创建它之后在producer上。
我不知道为什么超时不适用于那个手术。
我们也许可以把工厂改成延期交货 initTransactions() 直到第一次 beginTransaction() -但这只会把问题推向下游。
我使用kafka 1.0.0客户端进行了测试(它可以与1.3.1或更高版本一起使用,目前是1.3.2),但仍然存在问题。我认为它应该尊重 TRANSACTION_TIMEOUT_CONFIG 但似乎不是。
我建议开一个关于Kafka·吉拉的问题。

相关问题