assertionerror:未分配分区

hyrbngr7  于 2021-06-07  发布在  Kafka
关注(0)|答案(3)|浏览(1184)

我试图通过设置偏移量来使用主题中的数据,但得到Assert错误-

from kafka import KafkaConsumer

consumer = KafkaConsumer('foobar1',
                         bootstrap_servers=['localhost:9092'])
print 'process started'
print consumer.partitions_for_topic('foobar1')
print 'done'
consumer.seek(0,10)

for message in consumer:
    print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                          message.offset, message.key,
                                          message.value))
print 'process ended'

error:-

Traceback (most recent call last):
  File "/Users/pn/Documents/jobs/ccdn/kafka_consumer_1.py", line 21, in <module>
    consumer.seek(0,10)
  File "/Users/pn/.virtualenvs/vpsq/lib/python2.7/site-packages/kafka/consumer/group.py", line 549, in seek
    assert partition in self._subscription.assigned_partitions(), 'Unassigned partition'
AssertionError: Unassigned partition
omjgkv6w

omjgkv6w1#

在我的情况下 Kafka 0.9 以及 kafka-python ,期间发生分区分配 for message in consumer . 因此,搜索操作应该在迭代之后进行。我用以下代码重置组的偏移:

import kafka

ps = []
for i in xrange(topic_partition_number):
    ps.append(kafka.TopicPartition(topic, i))

consumer = kafka.KafkaConsumer(topic, bootstrap_servers=address, group_id=group)
for msg in consumer:
    print msg
    consumer.seek_to_beginning(*ps)
    consumer.commit()
    break
txu3uszq

txu3uszq2#

下面是一个解决问题的示例:

from kafka import KafkaConsumer, TopicPartition

con = KafkaConsumer(bootstrap_servers = my_bootstrapservers)
tp = TopicPartition(my_topic, 0)
con.assign([tp])
con.seek_to_beginning()
con.seek(tp, 1000000)

参考:kafka consumer seek不工作:assertionerror:未分配分区

ruoxqz4g

ruoxqz4g3#

在调用seek之前,必须使用topicpartitions列表调用consumer.assign()。还要注意seek的第一个参数也是一个topicpartition。参见kafkaconsumer api

相关问题