Kafka消费0.9向后兼容吗?

yrdbyhpb  于 2021-06-08  发布在  Kafka
关注(0)|答案(5)|浏览(353)

即将推出的kafka消费者0.9.x是否将与0.8 broker兼容?
换言之-是否可以只切换到新的使用者实现,而不涉及其他任何内容?

bkhjykvo

bkhjykvo1#

根据kafka0.9.0的文档,您不能使用新使用者从0.8.x代理读取数据。原因如下:
0.9.0.0对以前的版本进行了代理间协议更改。

lawou6xi

lawou6xi2#

基于这个消费者客户重新设计的wiki页面,
这将涉及对消费者API*的一些重大更改,因此我们希望从我们的社区收集关于该建议的反馈。由于更改的列表不小,我们想了解一些特性是否优于其他特性,更重要的是,一些特性是否完全不需要。

  • 我的。

我没有找到任何地方明确指出不兼容。但是引用这句话以及0.8中的producer与0.7中的producer不兼容的事实,我假设它们不兼容。

bq3bfh9z

bq3bfh9z3#

不可以。通常建议在客户端之前升级代理,因为代理的目标是向后兼容。0.9代理将同时使用0.8消费者API和0.9消费者API,但不是相反。

kuarbcqp

kuarbcqp4#

在Kafka看来,0.9.0内置了向后兼容性。检查http://kafka.apache.org/documentation.html#upgrade
文献引证
0.9.0.0具有潜在的突破性更改(请在升级之前进行检查)和以前版本的代理间协议更改。对于滚动升级:
更新所有代理上的server.properties文件并添加以下属性:inter.broker.protocol.version=0.8.2.x
升级代理。这可以由一个代理一次完成,只需将其关闭、更新代码并重新启动即可。
升级整个集群后,通过编辑inter.broker.protocol.version并将其设置为0.9.0.0来升级协议版本。
逐个重新启动代理,使新协议版本生效

wtlkbnrh

wtlkbnrh5#

我最近遇到了一个类似的问题,在我的应用程序中,我不得不读Kafka0.9,然后写回Kafka0.8。我使用Kafka客户端0.9的方式如下。
消费者配置

props.put("bootstrap.servers", "brokers_ip as comma seperated values");
    props.put("group.id", "your group id");
    props.put("key.deserializer", StringDeserializer.class.getName());
    props.put("value.deserializer", StringDeserializer.class.getName());
    props.put("enable.auto.commit", "true");
    props.put("auto.commit.interval.ms", 1000);
    props.put("session.timeout.ms", 30000);
    consumer = new KafkaConsumer<String, String>(props);
    consumer.subscribe("List of topics to subscribe too");

生产者配置

Properties props = new Properties();
        props.put("bootstrap.servers","list of broker ips");
        props.put("metadata.broker.list", "list of broker ips");
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        ProducerConfig config = new ProducerConfig(props);
        Producer<String, String> producer = new Producer<String, String>(config);
        String message = "hello world";
        KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic_name, message);
        producer.send(data);
        producer.close();

希望这有帮助。

相关问题