kafka python consumer从偏移量开始读取(自动)

dgsult0t  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(686)

我正在尝试用kafkapython构建一个应用程序,在这个应用程序中,使用者从一系列主题中读取数据。非常重要的一点是,消费者绝不会重复阅读同一条信息,也绝不会错过一条信息。
一切似乎都很正常,除了我关闭耗电元件(例如故障)并尝试从偏移量开始读取。我只能读取该主题中的所有消息(这会导致重复读取),或者只侦听新消息(并且错过在崩溃期间发出的消息)。我在暂停消费者时没有遇到此问题。
为了解决这个问题,我创建了一个独立的模拟。
通用生产商:

from time import sleep
from json import dumps
from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers=['localhost:9092'])

x=0 # set manually to avoid duplicates 

for e in range(1000):
    if e <= x:
        pass
    else:
        data = dumps(
            {
            'number' : e
        }
        ).encode('utf-8')

        producer.send('numtest', value=data)
        print(e, ' send.')

        sleep(5)

以及消费者。如果 auto_offset_reset 设置为 'earliest' ,将重新读取所有消息。如果 auto_offset_reset 设置为 'latest' ,在停机期间不会读取任何消息。

from kafka import KafkaConsumer
from pymongo import MongoClient
from json import loads

## Retrieve data from kafka (WHAT ABOUT MISSED MESSAGES?)

consumer = KafkaConsumer('numtest', bootstrap_servers=['localhost:9092'],
                         auto_offset_reset='earliest', enable_auto_commit=True,
                         auto_commit_interval_ms=1000)

## Connect to database

client = MongoClient('localhost:27017')
collection = client.counttest.counttest

# Send data

for message in consumer:
    message = loads(message.value.decode('utf-8'))
    collection.insert_one(message)
    print('{} added to {}'.format(message, collection))

我觉得自动提交没有正常工作。
我知道这个问题和这个问题很相似,但我想要一个具体的解决方案。
谢谢你帮我。

kdfy810k

kdfy810k1#

是否可以从不同的服务器使用使用者。我已经试过了,下面是相同的代码,它没有从Kafka获取任何数据。

consumer = KafkaConsumer('tet', bootstrap_servers=['192.168.1.20:9092'],
                     auto_offset_reset='earliest', enable_auto_commit=True,
                     auto_commit_interval_ms=1000, group_id=None)

note:- when 我给了错误的ip或端口号。

sxissh06

sxissh062#

您得到这种行为是因为您的消费者没有使用消费者组。有了消费者群体,消费者会定期向Kafka承诺(保留)自己的立场。这样,如果重新启动,它将从上次提交的位置恢复。
要使您的消费者使用消费者组,您需要设置 group_id 在建造它的时候。看到了吗 group_id 文件说明:
用于动态分区分配(如果启用)以及用于获取和提交偏移量的使用者组的名称。如果没有,则禁用自动分区分配(通过组协调器)和偏移提交。默认值:无
例如:

consumer = KafkaConsumer('numtest', bootstrap_servers=['localhost:9092'],
                         auto_offset_reset='earliest', enable_auto_commit=True,
                         auto_commit_interval_ms=1000, group_id='my-group')

相关问题