嗨,
我正在使用springkafka1.3.0.release创建一个事务生产者。当引导服务器关闭时,defaultkafkaproducerfactory会无休止地等待,直到引导服务器启动。
我做错什么了?我可以设置超时和/或其他类似属性吗?
这是我的代码的一个示例,用于重现场景:
public static void main(String[] args) {
final DefaultKafkaProducerFactory<Object, Object> producerFactory = new DefaultKafkaProducerFactory<>(producerConfigs());
producerFactory.setTransactionIdPrefix("transactionIdPrefix");
final Producer<Object, Object> producer = producerFactory.createProducer();
System.out.println("Created producer:" + producer);
}
private static Map<String, Object> producerConfigs() {
final Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.56.1:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.RETRIES_CONFIG, 1);
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 1000);
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 1000);
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 1000);
return props;
}
1条答案
按热度按时间gxwragnw1#
是工厂打电话引起的
initTransactions()
例如,如果没有足够的代理来支持事务日志复制因子,则在创建它之后在producer上。我不知道为什么超时不适用于那个手术。
我们也许可以把工厂改成延期交货
initTransactions()
直到第一次beginTransaction()
-但这只会把问题推向下游。我使用kafka 1.0.0客户端进行了测试(它可以与1.3.1或更高版本一起使用,目前是1.3.2),但仍然存在问题。我认为它应该尊重
TRANSACTION_TIMEOUT_CONFIG
但似乎不是。我建议开一个关于Kafka·吉拉的问题。