我正在尝试用kafkapython构建一个应用程序,在这个应用程序中,使用者从一系列主题中读取数据。非常重要的一点是,消费者绝不会重复阅读同一条信息,也绝不会错过一条信息。
一切似乎都很正常,除了我关闭耗电元件(例如故障)并尝试从偏移量开始读取。我只能读取该主题中的所有消息(这会导致重复读取),或者只侦听新消息(并且错过在崩溃期间发出的消息)。我在暂停消费者时没有遇到此问题。
为了解决这个问题,我创建了一个独立的模拟。
通用生产商:
from time import sleep
from json import dumps
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
x=0 # set manually to avoid duplicates
for e in range(1000):
if e <= x:
pass
else:
data = dumps(
{
'number' : e
}
).encode('utf-8')
producer.send('numtest', value=data)
print(e, ' send.')
sleep(5)
以及消费者。如果 auto_offset_reset
设置为 'earliest'
,将重新读取所有消息。如果 auto_offset_reset
设置为 'latest'
,在停机期间不会读取任何消息。
from kafka import KafkaConsumer
from pymongo import MongoClient
from json import loads
## Retrieve data from kafka (WHAT ABOUT MISSED MESSAGES?)
consumer = KafkaConsumer('numtest', bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest', enable_auto_commit=True,
auto_commit_interval_ms=1000)
## Connect to database
client = MongoClient('localhost:27017')
collection = client.counttest.counttest
# Send data
for message in consumer:
message = loads(message.value.decode('utf-8'))
collection.insert_one(message)
print('{} added to {}'.format(message, collection))
我觉得自动提交没有正常工作。
我知道这个问题和这个问题很相似,但我想要一个具体的解决方案。
谢谢你帮我。
2条答案
按热度按时间kdfy810k1#
是否可以从不同的服务器使用使用者。我已经试过了,下面是相同的代码,它没有从Kafka获取任何数据。
note:- when 我给了错误的ip或端口号。
sxissh062#
您得到这种行为是因为您的消费者没有使用消费者组。有了消费者群体,消费者会定期向Kafka承诺(保留)自己的立场。这样,如果重新启动,它将从上次提交的位置恢复。
要使您的消费者使用消费者组,您需要设置
group_id
在建造它的时候。看到了吗group_id
文件说明:用于动态分区分配(如果启用)以及用于获取和提交偏移量的使用者组的名称。如果没有,则禁用自动分区分配(通过组协调器)和偏移提交。默认值:无
例如: