我试图通过设置偏移量来使用主题中的数据,但得到Assert错误-
from kafka import KafkaConsumer
consumer = KafkaConsumer('foobar1',
bootstrap_servers=['localhost:9092'])
print 'process started'
print consumer.partitions_for_topic('foobar1')
print 'done'
consumer.seek(0,10)
for message in consumer:
print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value))
print 'process ended'
error:-
Traceback (most recent call last):
File "/Users/pn/Documents/jobs/ccdn/kafka_consumer_1.py", line 21, in <module>
consumer.seek(0,10)
File "/Users/pn/.virtualenvs/vpsq/lib/python2.7/site-packages/kafka/consumer/group.py", line 549, in seek
assert partition in self._subscription.assigned_partitions(), 'Unassigned partition'
AssertionError: Unassigned partition
3条答案
按热度按时间omjgkv6w1#
在我的情况下
Kafka 0.9
以及kafka-python
,期间发生分区分配for message in consumer
. 因此,搜索操作应该在迭代之后进行。我用以下代码重置组的偏移:txu3uszq2#
下面是一个解决问题的示例:
参考:kafka consumer seek不工作:assertionerror:未分配分区
ruoxqz4g3#
在调用seek之前,必须使用topicpartitions列表调用consumer.assign()。还要注意seek的第一个参数也是一个topicpartition。参见kafkaconsumer api