如何使用kafka.consumer.simpleconsumer,seek()

cgh8pdjw  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(377)

api文档是here:httphttp://kafka-python.readthedocs.org/en/latest/apidoc/kafka.consumer.html
但当我运行以下代码时,异常是%d格式:需要数字,而不是nonetype

client = KafkaClient("localhost:9092")
    consumer = SimpleConsumer(client, "test-group", "test")
    consumer.seek(0, whence=None)# (0,2) and (0,0)
    run = True
    while( run ):
        message = consumer.get_message(block=False, timeout=4000)

    except Exception as e:
        print "Exception while trying to read msg:", str(e)

当我使用下面的代码时,异常是seek()得到了一个意外的关键字参数'partition'

consumer.seek(0, whence=None, partition=None)# (0,2) and (0,0)

你知道吗?谢谢。

qyuhtwio

qyuhtwio1#

在Kafka权威指南中,有一个 seek() 用java编写(不是用python编写的,但我希望您能理解其中的大意)。

public class SaveOffsetsOnRebalance implements ConsumerRebalanceListener {

         public void onPartitionsRevoked (Collection <TopicPartition> partitions) {
                  commitDBTransaction();
         }

         public void onPartitionsAssigned(Collection <TopicPartiton> partitions) {
              for(TopicPartition partition : partitions)
                  consumer.seek(partition, getOffsetFromDB(partition));
         }

     }
}   // these brackets are exactly the same as the book. I didn't change anything. You might want to though.    

   consumer.subscribe (topics, new SaveOffsetOnRebalance(consumer));
   consumer.poll(0);

   for ( TopicPartition partition : consumer.assignment())
       consumer.seek(partition, getOffsetFromDB(partition));

   while (true) {
         ConsumerRecords <String, String> records = consumer.poll(100);
         for (ConsumerRecord <String, String> record : records)
         { 
               processRecord(record);
               storeRecordInDB(record);
               storeOffsetInDB(record.topic(), record.partition(), record.offset());
         }
         commitDBTransaction();
   }

相关问题